3

目的:

ストリーム データを読み込み、キーを追加して、キーごとにカウントしたい。

問題:

ストリーミング アプローチ (無制限のデータ) を使用して大きなサイズのデータ​​を読み込んでキーごとにグループ化しようとすると、Apache Beam Dataflow パイプラインでメモリ エラーが発生します。データがグループバイに蓄積されているようで、各ウィンドウのトリガーでデータを先に発射しないためです。

要素のサイズを小さくすると (要素の数は変わりません)、うまくいきます! 実際には group-by step はすべてのデータがグループ化されるのを待ってから、すべての新しいウィンドウ化されたデータを起動するためです。

私は両方でテストしました:

ビーム バージョン 2.11.0 および scio バージョン 0.7.4

ビーム バージョン 2.6.0 および scio バージョン 0.6.1

エラーを再生成する方法:

  1. ファイル名を含む Pubsub メッセージを読み取る
  2. 関連ファイルをGCSから行ごとの反復子として読み取ってロードします
  3. 行ごとにフラット化する (約 10,000 の要素を生成する)
  4. 要素にタイムスタンプ (現在の時刻) を追加する
  5. データのキー値を作成します (1 から 10 までのランダムな整数キーを使用)
  6. トリガーを使用してウィンドウを適用します (行が小さく、メモリに問題がない場合、約 50 回トリガーされます)
  7. キーごとにカウントします(キーごとにグループ化してから結合します)
  8. 最後に、ウィンドウとキーごとのカウントを表す約 50 * 10 の要素があると想定しました (行サイズが十分に小さい場合、正常にテストされました)。

パイプラインの視覚化 (ステップ 4 から 7 ):

ここに画像の説明を入力

group-by-key ステップの要約:

ここに画像の説明を入力

ご覧のとおり、データはグループごとに蓄積され、出力されません。

ウィンドウ処理コードは次のとおりです。

val windowedData = data.applyKvTransform(
  Window.into[myt](
    Sessions.withGapDuration(Duration.millis(1)))
    .triggering(
      Repeatedly.forever(AfterFirst.of(
        AfterPane.elementCountAtLeast(10),
        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1)))

      ).orFinally(AfterWatermark.pastEndOfWindow())

    ).withAllowedLateness(Duration.standardSeconds(100))
    .discardingFiredPanes()

)

エラー:

org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException: Commit request for stage S2 and key 2 is larger than 2GB and cannot be processed. This may be caused by grouping a very large amount of data in a single window without using Combine, or by producing a large amount of data from a single input element.
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException.causedBy(StreamingDataflowWorker.java:230)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:146)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1008)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    java.lang.Thread.run(Thread.java:745)

グループバイに各ウィンドウの初期結果を強制的に発行させることで、メモリの問題を解決する解決策はありますか。

4

1 に答える 1