1

Apache Beam Java SDK を使用して 2 つの無制限のソースを結合しようとしています。参加中に以下のエラー メッセージが表示されます。

スレッド「メイン」での例外 java.lang.UnsupportedOperationException: 無制限の PCollection への結合は、現在、遅延が許可されていないデフォルトのトリガーなど、ウィンドウごとに 1 回出力を生成することが知られているトリガーを持つ非グローバル ウィンドウに対してのみサポートされています。このような場合、Beam はすべての入力要素をウィンドウごとに 1 回結合することを保証できます。WindowingStrategy{windowFn=org.apache.beam.sdk.transforms.windowing.SlidingWindows@1b87117、allowedLateness=PT0S、trigger=Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 分))、accumulationMode=DISCARDING_FIRED_PANES、timestampCombiner=EARLIEST } は、org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel の org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.verifySupportedTrigger(BeamJoinRel.java:341) ではサポートされていません。

固定ウィンドウとスライディング ウィンドウの両方をトリガー (pastEndOfWindow & pastFirstElementInPane) と一緒に使用して、遅延をゼロにしてみました。Accumalate と Discard の両方の起動済みペインを試しました。毎回同じエラー メッセージが表示されます。

以下は、固定ウィンドウとスライド ウィンドウの両方を使用して試した 2 つのスニペットです。

p1.apply("window",
    Window
      .<Row>into(FixedWindows.of(Duration.standardSeconds(50)))
      .triggering(AfterWatermark.pastEndOfWindow())
      .withAllowedLateness(Duration.ZERO)
      .accumulatingFiredPanes());
p1.apply("window2",
    Window.<Row>into(
        SlidingWindows
          .of(Duration.standardSeconds(30))
          .every(Duration.standardSeconds(5)))
      .triggering(
        Repeatedly
          .forever(
             AfterProcessingTime
               .pastFirstElementInPane()
               .plusDelayOf(Duration.standardMinutes(1))))
      .withAllowedLateness(Duration.ZERO)
      .discardingFiredPanes());

私は単純に、スライディング ウィンドウを使用して SQL 変換を実装したかったのです。遅延をトリガーし、遅延を許可します。それを実装するために私を親切に案内してください。

ありがとう、ゴーサム

4

2 に答える 2