問題タブ [apache-beam]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
google-cloud-dataflow - Dataflow のワーカー数を指定するにはどうすればよいですか?
約 90 GB の大きなインポート ファイルをロードする Apache Beam パイプラインがあります。Apache Beam Java SDK でパイプラインを作成しました。
のデフォルト設定を使用するPipelineOptionsFactory
と、ジョブが完了するまでにかなりの時間がかかります。
ジョブの並列処理、つまりワーカー数を制御し、プログラムで指定するにはどうすればよいですか?
google-cloud-dataflow - BigQuery からの Cloud Dataflow ジョブの読み取りが開始前にスタックする
アプリケーション ロジックを実行する前に、開始フェーズでスタックしている Cloud Dataflow ジョブがあります。ステップ内にログ出力ステートメントを追加してこれをテストしましたprocessElement
が、ログに表示されないため、到達していないようです。
ログに表示されるのは次のメッセージだけです。これは毎分表示されます。
logger: Starting supervisor: /etc/supervisor/supervisord_watcher.sh: line 36: /proc//oom_score_adj: Permission denied
そして、これらは数秒ごとにループします:
VM is healthy? true.
http: TLS handshake error from 172.17.0.1:38335: EOF
Job is in state JOB_STATE_RUNNING, will check again in 30 seconds.
ジョブ ID は ですが、午前中に開始した別の2015-09-14_06_30_22-15275884222662398973
2 つのジョブ ( 2015-09-14_05_59_30-11021392791304643671
、 ) も同じ問題を抱えています。2015-09-14_06_08_41-3621035073455045662
これを引き起こしている可能性のあるアイデアはありますか?
apache-beam - アパッチビームとは?
Apache の投稿を調べていて、Beam という新しい用語を見つけました。Apache Beam とは何かを説明できる人はいますか? 私はグーグルアウトを試みましたが、明確な答えを得ることができませんでした.
apache-kafka - FlinkKafkaProducer/FlinkKafkaConsumer を使用した avro データのシリアライズとデシリアライズに関する問題
BeamをFlinkKafkaProducer
介して Avro データを読み書きしているときに、いくつかの問題に直面しています。FlinkKafkaConsumer
FlinkKafkaProducer
誰かがAvroスキーマの実例と使用例を指摘できれば素晴らしいでしょうFlinkKafkaConsumer
(Kafkaのコンフルエント版を使用していません)
A) BeamKafkaFlinkAvroProducerTest (プロデューサー)
KafkaProducer を直接使用した場合 (つまり、 ProduceSimpleData を呼び出した場合)、問題なく動作しています (テストのためだけに)。次の手順でUnboundedSource として使用FlinkKafkaProducer
します (これが私がすべきことです) (つまり、 ProduceAvroData2を呼び出します)。
まず、私が使用する場合
AvroSerializationSchema schema = new AvroSerializationSchema(Test.class);
つまり、基本的に Avro を使用し
/li>org.apache.avro.specific.SpecificDatumWriter
ます。次のエラーに直面しています:次に、
/li>TypeInformationSerializationSchema
(パイプラインの AvroCoder に関係なく) 使用すると、Kafka テスト コンシューマー ツールがメッセージを出力するため、問題なく動作するようです。
B) BeamKafkaFlinkAvroConsumerTest (消費者)
TypeInformationSerializationSchema
コンシューマーとプロデューサーの両方で使用するか、コンシューマーとプロデューサーでそれぞれ and を使用するAvroDeserializationSchema
必要があることを理解してAvroSerializationSchema
います。
しかし、AvroDeserializationSchema
orの使用に関係なくTypeInformationSerializationSchema
、次の例外が発生します。
非常に基本的なものが欠けている可能性があります。すべてのコードはこちらです。
java - Dataflow/Beam でストリーミング データと大規模な履歴データ セットを組み合わせる方法
Google Dataflow/Apache Beam を介した Web ユーザー セッションからの処理ログを調査しており、ユーザーのログ (ストリーミング) を先月のユーザー セッションの履歴と組み合わせる必要があります。
私は次のアプローチを見てきました:
- 30 日間の固定ウィンドウを使用します。ほとんどの場合、メモリに収まるウィンドウが大きくなります。ユーザーの履歴を更新する必要はありません。参照するだけです。
- CoGroupByKey を使用して 2 つのデータ セットを結合しますが、2 つのデータ セットのウィンドウ サイズは同じでなければなりません ( https://cloud.google.com/dataflow/model/group-by-key#join )。ケース (24 時間 vs 30 日)
element
Side Input を使用して、特定の inのユーザーのセッション履歴を取得します。processElement(ProcessContext processContext)
私の理解では、経由でロードされたデータは.withSideInputs(pCollectionView)
メモリに収まる必要があります。1 人のユーザーのすべてのセッション履歴をメモリに格納できることはわかっていますが、すべてのセッション履歴を格納できるわけではありません。
私の質問は、現在のユーザー セッションにのみ関連する副入力からデータをロード/ストリーミングする方法があるかどうかです。
ユーザーの ID を指定して、サイド入力からユーザーの履歴セッションをロードする parDo 関数を想像しています。ただし、現在のユーザーの履歴セッションのみがメモリに収まります。副入力を介してすべての履歴セッションをロードすると、大きすぎます。
説明するためのいくつかの擬似コード:
google-cloud-dataflow - モックを Dataflow テストへの入力として使用する良い方法はありますか?
DoFn<KV<String, twitter4j.Status>, String>
実装をテストし、テスト データを入力として提供しようとしています。私が模索していた 1 つの方法は、Mockito.mock
オブジェクトを入力として使用することでした。それ以外の方法で実装する抽象メソッドが多数あるためです。ただし、 my でモックされたメソッドを呼び出すとDoFn
オブジェクトが変更されるため、テスト フレームワークは「出力後に値を変更してはならない」と不平を言います。
ここで試していることを達成する別の方法はありますか? テストコードは大まかに次のとおりです。