問題タブ [apache-beam]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
0 に答える
2693 参照

google-cloud-dataflow - Dataflow のワーカー数を指定するにはどうすればよいですか?

約 90 GB の大きなインポート ファイルをロードする Apache Beam パイプラインがあります。Apache Beam Java SDK でパイプラインを作成しました。

のデフォルト設定を使用するPipelineOptionsFactoryと、ジョブが完了するまでにかなりの時間がかかります。

ジョブの並列処理、つまりワーカー数を制御し、プログラムで指定するにはどうすればよいですか?

0 投票する
1 に答える
922 参照

google-cloud-dataflow - BigQuery からの Cloud Dataflow ジョブの読み取りが開始前にスタックする

アプリケーション ロジックを実行する前に、開始フェーズでスタックしている Cloud Dataflow ジョブがあります。ステップ内にログ出力ステートメントを追加してこれをテストしましたprocessElementが、ログに表示されないため、到達していないようです。

ログに表示されるのは次のメッセージだけです。これは毎分表示されます。

logger: Starting supervisor: /etc/supervisor/supervisord_watcher.sh: line 36: /proc//oom_score_adj: Permission denied

そして、これらは数秒ごとにループします:

VM is healthy? true.

http: TLS handshake error from 172.17.0.1:38335: EOF

Job is in state JOB_STATE_RUNNING, will check again in 30 seconds.

ジョブ ID は ですが、午前中に開始した別の2015-09-14_06_30_22-152758842226623989732 つのジョブ ( 2015-09-14_05_59_30-11021392791304643671、 ) も同じ問題を抱えています。2015-09-14_06_08_41-3621035073455045662

これを引き起こしている可能性のあるアイデアはありますか?

0 投票する
2 に答える
23467 参照

apache-beam - アパッチビームとは?

Apache の投稿を調べていて、Beam という新しい用語を見つけました。Apache Beam とは何かを説明できる人はいますか? 私はグーグルアウトを試みましたが、明確な答えを得ることができませんでした.

0 投票する
0 に答える
1768 参照

apache-kafka - FlinkKafkaProducer/FlinkKafkaConsumer を使用した avro データのシリアライズとデシリアライズに関する問題

BeamをFlinkKafkaProducer介して Avro データを読み書きしているときに、いくつかの問題に直面しています。FlinkKafkaConsumer

FlinkKafkaProducer誰かがAvroスキーマの実例と使用例を指摘できれば素晴らしいでしょうFlinkKafkaConsumer(Kafkaのコンフルエント版を使用していません)

A) BeamKafkaFlinkAvroProducerTest (プロデューサー)

KafkaProducer を直接使用した場合 (つまり、 ProduceSimpleData を呼び出した場合)、問題なく動作しています (テストのためだけに)。次の手順でUnboundedSource として使用FlinkKafkaProducerします (これが私がすべきことです) (つまり、 ProduceAvroData2を呼び出します)。

  1. まず、私が使用する場合AvroSerializationSchema schema = new AvroSerializationSchema(Test.class);

    つまり、基本的に Avro を使用しorg.apache.avro.specific.SpecificDatumWriterます。次のエラーに直面しています:

    /li>
  2. 次に、TypeInformationSerializationSchema(パイプラインの AvroCoder に関係なく) 使用すると、Kafka テスト コンシューマー ツールがメッセージを出力するため、問題なく動作するようです。

    /li>

B) BeamKafkaFlinkAvroConsumerTest (消費者)

TypeInformationSerializationSchemaコンシューマーとプロデューサーの両方で使用するか、コンシューマーとプロデューサーでそれぞれ and を使用するAvroDeserializationSchema必要があることを理解してAvroSerializationSchemaいます。

しかし、AvroDeserializationSchemaorの使用に関係なくTypeInformationSerializationSchema、次の例外が発生します。


非常に基本的なものが欠けている可能性があります。すべてのコードはこちらです。

0 投票する
1 に答える
2623 参照

java - Dataflow/Beam でストリーミング データと大規模な履歴データ セットを組み合わせる方法

Google Dataflow/Apache Beam を介した Web ユーザー セッションからの処理ログを調査しており、ユーザーのログ (ストリーミング) を先月のユーザー セッションの履歴と組み合わせる必要があります。

私は次のアプローチを見てきました:

  1. 30 日間の固定ウィンドウを使用します。ほとんどの場合、メモリに収まるウィンドウが大きくなります。ユーザーの履歴を更新する必要はありません。参照するだけです。
  2. CoGroupByKey を使用して 2 つのデータ セットを結合しますが、2 つのデータ セットのウィンドウ サイズは同じでなければなりません ( https://cloud.google.com/dataflow/model/group-by-key#join )。ケース (24 時間 vs 30 日)
  3. elementSide Input を使用して、特定の inのユーザーのセッション履歴を取得します。processElement(ProcessContext processContext)

私の理解では、経由でロードされたデータは.withSideInputs(pCollectionView)メモリに収まる必要があります。1 人のユーザーのすべてのセッション履歴をメモリに格納できることはわかっていますが、すべてのセッション履歴を格納できるわけではありませ

私の質問は、現在のユーザー セッションにのみ関連する副入力からデータをロード/ストリーミングする方法があるかどうかです。

ユーザーの ID を指定して、サイド入力からユーザーの履歴セッションをロードする parDo 関数を想像しています。ただし、現在のユーザーの履歴セッションのみがメモリに収まります。副入力を介してすべての履歴セッションをロードすると、大きすぎます。

説明するためのいくつかの擬似コード:

0 投票する
2 に答える
1423 参照

google-cloud-dataflow - モックを Dataflow テストへの入力として使用する良い方法はありますか?

DoFn<KV<String, twitter4j.Status>, String>実装をテストし、テスト データを入力として提供しようとしています。私が模索していた 1 つの方法は、Mockito.mockオブジェクトを入力として使用することでした。それ以外の方法で実装する抽象メソッドが多数あるためです。ただし、 my でモックされたメソッドを呼び出すとDoFnオブジェクトが変更されるため、テスト フレームワークは「出力後に値を変更してはならない」と不平を言います。

ここで試していることを達成する別の方法はありますか? テストコードは大まかに次のとおりです。