0

複数の無制限のソースと副入力を備えたデータ パイプラインを実装し、スライディング ウィンドウ (30 秒と 10 秒ごと) でデータを結合し、変換された出力を Kafka トピックに出力しました。私が抱えている問題は、ウィンドウの最初の 10 秒間に受信したデータが 3 回 (つまり) 新しいウィンドウが開始されるたびに、最初のウィンドウが完了するまでトリガーされることです。変換されたデータを一度だけ出力する方法、または重複を避ける方法は?

私は破棄されたペインを使用しましたが、違いはありません。ウィンドウを閉じる動作を FIRE_ALWAYS/FIRE_IF_NON_EMPTY に設定しようとすると、次のエラーがスローされます。

スレッド "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException での例外: java.lang.IllegalArgumentException: 空の PCollection がシングルトン ビューとしてアクセスされました。org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:332) で org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner. java:302) org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197) で org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64) で org.apache .beam.sdk.Pipeline.run(Pipeline.java:313) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) at y.yyy.main(yyy.java:86) 原因: java.lang.IllegalArgumentException: 空の PCollection がシングルトン ビューとしてアクセスされました。

data.apply("Transform", ParDo.of(
  new DoFn<String, Row>() {

    private static final long serialVersionUID = 1L;

    @ProcessElement
    public void processElement(
      ProcessContext processContext,
      final OutputReceiver<Row> emitter) {

        String record = processContext.element();
        final String[] parts = record.split(",");
        emitter.output(Row.withSchema(sch).addValues(parts).build());
    }
  })).apply(
    "window1",
    Window
      .<Row>into(
        SlidingWindows
          .of(Duration.standardSeconds(30))
          .every(Duration.standardSeconds(10)))
      .withAllowedLateness(
        Duration.ZERO,
        Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
  .discardingFiredPanes());

ウィンドウを一度だけトリガーするように親切にガイドしてください(つまり、すでに処理されたレコードを送信したくない)

更新: Side Input の上記のエラーが頻繁に発生します。Windows が原因ではなく、Apache Beam の問題のようです ( https://issues.apache.org/jira/browse/BEAM-6086 )

行が既に処理されているかどうかを識別するために State を使用しようとしましたが、状態が保持されていないか、設定されていません。(つまり) 状態の読み取り中に常に null を取得します。

public class CheckState extends DoFn<KV<String,String>,KV<Integer,String>> {
  private static final long serialVersionUID = 1L;

  @StateId("count")
  private final StateSpec<ValueState<String>> countState =
                     StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void processElement(
    ProcessContext processContext,
    @StateId("count") ValueState<String> countState) {

        KV<String,String> record = processContext.element();
        String row = record.getValue();
        System.out.println("State: " + countState.read());
        System.out.println("Setting state as "+ record.getKey() + " for value"+ row.split(",")[0]);
        processContext.output(KV.of(current, row));
        countState.write(record.getKey());
    }
4

1 に答える 1