1

Pub/Subトピック + サブスクリプションがあり、 Dataflowのサブスクリプションから無制限のデータを消費して集約したいと考えています。固定ウィンドウを使用して、集計を BigQuery に書き込みます。

読み取りと書き込み (ウィンドウ処理と集計なし) は正常に機能します。しかし、(各ウィンドウ内の要素をカウントするために) データを固定ウィンドウにパイプすると、ウィンドウは決してトリガーされません。したがって、集計は書き込まれません。

これが私のWord Publisherです(例のkinglear.txtを入力ファイルとして使用しています):

public static class AddCurrentTimestampFn extends DoFn<String, String> {
    @ProcessElement public void processElement(ProcessContext c) {
        c.outputWithTimestamp(c.element(), new Instant(System.currentTimeMillis()));
    }
}

public static class ExtractWordsFn extends DoFn<String, String> {
    @ProcessElement public void processElement(ProcessContext c) {
        String[] words = c.element().split("[^a-zA-Z']+");
        for (String word:words){ if(!word.isEmpty()){ c.output(word); }}
    }
}

// main:
Pipeline p = Pipeline.create(o); // 'o' are the pipeline options
p.apply("ReadLines", TextIO.Read.from(o.getInputFile()))
        .apply("Lines2Words", ParDo.of(new ExtractWordsFn()))
        .apply("AddTimestampFn", ParDo.of(new AddCurrentTimestampFn()))
        .apply("WriteTopic", PubsubIO.Write.topic(o.getTopic()));
p.run();

これが私のウィンドウ付きワードカウンターです:

Pipeline p = Pipeline.create(o); // 'o' are the pipeline options

BigQueryIO.Write.Bound tablePipe = BigQueryIO.Write.to(o.getTable(o))
        .withSchema(o.getSchema())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);

Window.Bound<String> w = Window
        .<String>into(FixedWindows.of(Duration.standardSeconds(1)));

p.apply("ReadTopic", PubsubIO.Read.subscription(o.getSubscription()))
        .apply("FixedWindow", w)
        .apply("CountWords", Count.<String>perElement())
        .apply("CreateRows", ParDo.of(new WordCountToRowFn()))
        .apply("WriteRows", tablePipe);
p.run();

デフォルトのトリガーを使用してウィンドウがトリガーされないように見えるため、上記のサブスクライバーは機能しません。ただし、トリガーを手動で定義すると、コードが機能し、カウントが BigQuery に書き込まれます。

Window.Bound<String> w = Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
        .triggering(AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardSeconds(1)))
        .withAllowedLateness(Duration.ZERO)
        .discardingFiredPanes();

可能であれば、カスタム トリガーを指定することは避けたいと思います。

質問:

  1. ソリューションが Dataflow のデフォルト トリガーで機能しないのはなぜですか?
  2. パブリッシャーまたはサブスクライバーを変更して、デフォルトのトリガーを使用してウィンドウをトリガーするにはどうすればよいですか?
4

1 に答える 1