目的:
ストリーム データを読み込み、キーを追加して、キーごとにカウントしたい。
問題:
ストリーミング アプローチ (無制限のデータ) を使用して大きなサイズのデータを読み込んでキーごとにグループ化しようとすると、Apache Beam Dataflow パイプラインでメモリ エラーが発生します。データがグループバイに蓄積されているようで、各ウィンドウのトリガーでデータを先に発射しないためです。
要素のサイズを小さくすると (要素の数は変わりません)、うまくいきます! 実際には group-by step はすべてのデータがグループ化されるのを待ってから、すべての新しいウィンドウ化されたデータを起動するためです。
私は両方でテストしました:
ビーム バージョン 2.11.0 および scio バージョン 0.7.4
ビーム バージョン 2.6.0 および scio バージョン 0.6.1
エラーを再生成する方法:
- ファイル名を含む Pubsub メッセージを読み取る
- 関連ファイルをGCSから行ごとの反復子として読み取ってロードします
- 行ごとにフラット化する (約 10,000 の要素を生成する)
- 要素にタイムスタンプ (現在の時刻) を追加する
- データのキー値を作成します (1 から 10 までのランダムな整数キーを使用)
- トリガーを使用してウィンドウを適用します (行が小さく、メモリに問題がない場合、約 50 回トリガーされます)
- キーごとにカウントします(キーごとにグループ化してから結合します)
- 最後に、ウィンドウとキーごとのカウントを表す約 50 * 10 の要素があると想定しました (行サイズが十分に小さい場合、正常にテストされました)。
パイプラインの視覚化 (ステップ 4 から 7 ):
group-by-key ステップの要約:
ご覧のとおり、データはグループごとに蓄積され、出力されません。
ウィンドウ処理コードは次のとおりです。
val windowedData = data.applyKvTransform(
Window.into[myt](
Sessions.withGapDuration(Duration.millis(1)))
.triggering(
Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(10),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1)))
).orFinally(AfterWatermark.pastEndOfWindow())
).withAllowedLateness(Duration.standardSeconds(100))
.discardingFiredPanes()
)
エラー:
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException: Commit request for stage S2 and key 2 is larger than 2GB and cannot be processed. This may be caused by grouping a very large amount of data in a single window without using Combine, or by producing a large amount of data from a single input element.
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException.causedBy(StreamingDataflowWorker.java:230)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:146)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1008)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
グループバイに各ウィンドウの初期結果を強制的に発行させることで、メモリの問題を解決する解決策はありますか。