問題タブ [apache-beam]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
446 参照

google-cloud-dataflow - バッチ処理パイプラインで最大より大きいメッセージを受信しました

Google のクラウド データフロー サービスで毎日実行されているバッチ処理パイプラインでこのメッセージを受け取りました。次のメッセージで失敗し始めました。

私はまだ古い回避策を使用して、次のようなヘッダーを含む CSV ファイルを出力しています。

これは何が原因ですか?この DoFn の出力は大きすぎますか? 処理中のデータセットのサイズは増加していません。

0 投票する
1 に答える
272 参照

google-cloud-dataflow - Python SDK でカスタム シンクをデバッグする方法

Python SDK でカスタム シンクを作成する際に問題があります。デバッグには logging.info や print メソッドを使用しましたが、うまく動作しませんでした。カスタム シンクをデバッグする良い方法はありますか?

0 投票する
1 に答える
897 参照

python - Python Apache Beam パイプライン ステータス API 呼び出し

現在、Python Apache Beam パイプラインが機能しており、ローカルで実行できます。現在、パイプラインを Google Cloud Dataflow で実行し、完全に自動化しようとしていますが、Dataflow/Apache Beam のパイプライン モニタリングに制限があることがわかりました。

現在、Cloud Dataflowには、パイプラインのステータスをモニタリングする方法が 2 つあります。UI インターフェースを使用する方法と、コマンドラインで gcloud を使用する方法です。これらのソリューションはどちらも、完全に自動化されたソリューションではうまく機能しません。ロスのないファイル処理を説明できます.

Apache Beam の github を見ると、ファイルinternal/apiclient.pyがあり、ジョブのステータスを取得するために使用される関数 get_job があることを示しています

get_job が使用されていることがわかった 1 つのインスタンスは、runners/dataflow_runner.pyにあります。

最終的な目標は、この API を使用して 1 つまたは複数のジョブのステータスを取得し、自動的に実行をトリガーして、最終的にすべてのジョブがパイプラインを介して正常に処理されるようにすることです。

パイプラインを実行した後、この API をどのように使用できるか説明してくれる人はいますp.run()か ( )。runnerinがどこから来るのかわかりませんresponse = runner.dataflow_client.get_job(job_id)

パイプラインのセットアップ/実行中にこの API 呼び出しにアクセスする方法について、誰かがより深く理解できるとしたら、それは素晴らしいことです!

0 投票する
1 に答える
518 参照

google-cloud-dataflow - Apache Beam: A の出力を B に渡さずに、ステップ B をステップ A に依存させることはできますか?

A が出力を生成しない場合に、PTransform B を PTransform A に依存させる方法はありますか? それとも、副入力として B に供給されるダミー出力を A に生成させる必要がありますか? ユースケースの例は、次のパイプラインが必要な場合です。

A が終了した後にのみ B を開始したいのですが、A は B にとって有用な出力 PCollection を生成しません。

0 投票する
1 に答える
711 参照

google-cloud-dataflow - Apache Beam: 構築済みのパイプラインでステップをスキップする

既に構築されたパイプラインのステップを条件付きでスキップする方法はありますか? それとも、実行するステップを制御する唯一の方法としてパイプラインの構築が設計されているのでしょうか?

0 投票する
1 に答える
1909 参照

python - Python でデータフロー/ビームからデータストアをクエリする方法

Google は、Python で dataflow/beam からデータストアをクエリするためのサポートをリリースしたようです。ローカルで実行しようとしていますが、いくつかの問題が発生しています。

これは私に与えています

私は間違ったクエリ オブジェクトを渡していると推測しています (データストアをインポートできる 3 ~ 4 個の pip パッケージがあります) が、どれを渡すべきかわかりません。テストでは、彼らはprotobufを渡しています。それは私が使用しなければならないものですか?それが私がしなければならないことである場合、誰でもprotobufを使用した簡単なクエリの例を示すことができますか?

0 投票する
2 に答える
11244 参照

python - Apache Beam Dataflow で csv を辞書に変換する方法

csv ファイルを読み取り、Apache Beam データフローを使用して BigQuery に書き込みたいと考えています。これを行うには、データを辞書の形式で BigQuery に提示する必要があります。これを行うためにApacheビームを使用してデータを変換するにはどうすればよいですか?

私の入力 csv ファイルには 2 つの列があり、BigQuery で後続の 2 列のテーブルを作成したいと考えています。私は BigQuery でデータを作成する方法を知っています。それは簡単です。私が知らないのは、csv を辞書に変換する方法です。以下のコードは正しくありませんが、私が何をしようとしているのかがわかります。