3

現在、Python Apache Beam パイプラインが機能しており、ローカルで実行できます。現在、パイプラインを Google Cloud Dataflow で実行し、完全に自動化しようとしていますが、Dataflow/Apache Beam のパイプライン モニタリングに制限があることがわかりました。

現在、Cloud Dataflowには、パイプラインのステータスをモニタリングする方法が 2 つあります。UI インターフェースを使用する方法と、コマンドラインで gcloud を使用する方法です。これらのソリューションはどちらも、完全に自動化されたソリューションではうまく機能しません。ロスのないファイル処理を説明できます.

Apache Beam の github を見ると、ファイルinternal/apiclient.pyがあり、ジョブのステータスを取得するために使用される関数 get_job があることを示しています

get_job が使用されていることがわかった 1 つのインスタンスは、runners/dataflow_runner.pyにあります。

最終的な目標は、この API を使用して 1 つまたは複数のジョブのステータスを取得し、自動的に実行をトリガーして、最終的にすべてのジョブがパイプラインを介して正常に処理されるようにすることです。

パイプラインを実行した後、この API をどのように使用できるか説明してくれる人はいますp.run()か ( )。runnerinがどこから来るのかわかりませんresponse = runner.dataflow_client.get_job(job_id)

パイプラインのセットアップ/実行中にこの API 呼び出しにアクセスする方法について、誰かがより深く理解できるとしたら、それは素晴らしいことです!

4

1 に答える 1

3

コードをいじって、ジョブの詳細を取得する方法を見つけました。次のステップは、すべてのジョブのリストを取得する方法があるかどうかを確認することです。

# start the pipeline process
pipeline                 = p.run()
# get the job_id for the current pipeline and store it somewhere
job_id                   = pipeline.job_id()
# setup a job_version variable (either batch or streaming)
job_version              = dataflow_runner.DataflowPipelineRunner.BATCH_ENVIRONMENT_MAJOR_VERSION
# setup "runner" which is just a dictionary, I call it local
local                    = {}
# create a dataflow_client
local['dataflow_client'] = apiclient.DataflowApplicationClient(pipeline_options, job_version)
# get the job details from the dataflow_client
print local['dataflow_client'].get_job(job_id)
于 2016-11-22T17:49:02.413 に答える