現在、Python Apache Beam パイプラインが機能しており、ローカルで実行できます。現在、パイプラインを Google Cloud Dataflow で実行し、完全に自動化しようとしていますが、Dataflow/Apache Beam のパイプライン モニタリングに制限があることがわかりました。
現在、Cloud Dataflowには、パイプラインのステータスをモニタリングする方法が 2 つあります。UI インターフェースを使用する方法と、コマンドラインで gcloud を使用する方法です。これらのソリューションはどちらも、完全に自動化されたソリューションではうまく機能しません。ロスのないファイル処理を説明できます.
Apache Beam の github を見ると、ファイルinternal/apiclient.pyがあり、ジョブのステータスを取得するために使用される関数 get_job があることを示しています。
get_job が使用されていることがわかった 1 つのインスタンスは、runners/dataflow_runner.pyにあります。
最終的な目標は、この API を使用して 1 つまたは複数のジョブのステータスを取得し、自動的に実行をトリガーして、最終的にすべてのジョブがパイプラインを介して正常に処理されるようにすることです。
パイプラインを実行した後、この API をどのように使用できるか説明してくれる人はいますp.run()
か ( )。runner
inがどこから来るのかわかりませんresponse = runner.dataflow_client.get_job(job_id)
。
パイプラインのセットアップ/実行中にこの API 呼び出しにアクセスする方法について、誰かがより深く理解できるとしたら、それは素晴らしいことです!