7

最初の実行で大量のデータを処理する必要があるストリーミング ジョブがあります。DoFn の 1 つは、バッチ リクエストをサポートするリモート サービスを呼び出すため、境界付きコレクションを操作する場合は、次のアプローチを使用します。

  private static final class Function extends DoFn<String, Void> implements Serializable {
    private static final long serialVersionUID = 2417984990958377700L;

    private static final int LIMIT = 500;

    private transient Queue<String> buffered;

    @StartBundle
    public void startBundle(Context context) throws Exception {
      buffered = new LinkedList<>();
    }

    @ProcessElement
    public void processElement(ProcessContext context) throws Exception {
      buffered.add(context.element());

      if (buffered.size() > LIMIT) {
        flush();
      }
    }

    @FinishBundle
    public void finishBundle(Context c) throws Exception {
      // process remaining
      flush();
    }

    private void flush() {
      // build batch request
      while (!buffered.isEmpty()) {
        buffered.poll();
        // do something
      }
    }
  }

無制限のコレクションで同じアプローチを使用できるように、データをウィンドウ化する方法はありますか?

私は次のことを試しました:

pipeline
    .apply("Read", Read.from(source))
    .apply(WithTimestamps.of(input -> Instant.now()))
    .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2L))))
    .apply("Process", ParDo.of(new Function()));

しかしstartBundlefinishBundleすべての要素に対して呼び出されます。RxJava (2 分ウィンドウまたは 100 要素バンドル) のようなものを持つ可能性はありますか:

source
    .toFlowable(BackpressureStrategy.LATEST)
    .buffer(2, TimeUnit.MINUTES, 100) 
4

2 に答える 2