最初の実行で大量のデータを処理する必要があるストリーミング ジョブがあります。DoFn の 1 つは、バッチ リクエストをサポートするリモート サービスを呼び出すため、境界付きコレクションを操作する場合は、次のアプローチを使用します。
private static final class Function extends DoFn<String, Void> implements Serializable {
private static final long serialVersionUID = 2417984990958377700L;
private static final int LIMIT = 500;
private transient Queue<String> buffered;
@StartBundle
public void startBundle(Context context) throws Exception {
buffered = new LinkedList<>();
}
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
buffered.add(context.element());
if (buffered.size() > LIMIT) {
flush();
}
}
@FinishBundle
public void finishBundle(Context c) throws Exception {
// process remaining
flush();
}
private void flush() {
// build batch request
while (!buffered.isEmpty()) {
buffered.poll();
// do something
}
}
}
無制限のコレクションで同じアプローチを使用できるように、データをウィンドウ化する方法はありますか?
私は次のことを試しました:
pipeline
.apply("Read", Read.from(source))
.apply(WithTimestamps.of(input -> Instant.now()))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2L))))
.apply("Process", ParDo.of(new Function()));
しかしstartBundle、finishBundleすべての要素に対して呼び出されます。RxJava (2 分ウィンドウまたは 100 要素バンドル) のようなものを持つ可能性はありますか:
source
.toFlowable(BackpressureStrategy.LATEST)
.buffer(2, TimeUnit.MINUTES, 100)