3

Google Cloud Dataflow SDK を使用してストリーミング パイプラインを作成しましたが、パイプラインをローカルでテストしたいと考えています。私のパイプラインは、Google Pub/Sub から入力データを受け取ります。

DirectPipelineRunner を使用して Pub/Sub (pubsubIO) にアクセスするジョブを実行することはできますか (Google Cloud ではなくローカルで実行)?

通常のユーザー アカウントとしてログインしているときに、アクセス許可の問題が発生しています。アクセスしようとしている pub/sub トピックのプロジェクトの所有者です。

4

4 に答える 4

3

InProcessPipelineRunnerは、無制限の PCollection のサポートを含む、 Dataflow SDK for Java 1.6.0で導入された DirectPipelineRunnerの新しいバージョンです。

(注: Apache Beam では、この機能は既に DirectRunner に追加されていますが、Dataflow SDK for Java では、2.0 までそれを行うことができません。これは、モデルのより良いチェックにより、追加のテストの失敗が発生する可能性があるためです。下位互換性のない変更のため、当面はコンパニオン InProcessPipelineRunner を追加します。)

また、遅れたデータや順不同のデータをテストするための優れた新しいサポートもいくつかあります。

于 2016-10-27T16:58:43.397 に答える
0

これを探している人を助けるために、

最新バージョンでは、これを行うことができます。パイプラインをローカルで実行する場合は、「DirectRunner」を使用してこれをローカルで実行します。これをクラウドで実行するには、「DataflowRunner」を使用します。

以下に示すように、ステージング場所とランナーを設定します。

streamingOption.setStagingLocation(PipelineConstants.PUBSUB_STAGING_LOCATION);

streamingOption.setRunner(DataflowRunner.class);

または引数として渡します。

直面した許可の問題についてもう少し詳しく説明していただけますか?

于 2017-11-13T11:49:59.067 に答える
-1

実際には可能ですが、DirectPipelineRunner は無制限のデータソースをサポートしていません。したがって、次のように設定する必要がありますmaxReadTimemaxNumRecords

PubsubIO.Read.topic("projects/<project-id>/topics/<topic>").maxNumRecords(1000);

PubSub のドキュメントから:

Cloud Pub/Sub ストリームから継続的に読み取り、ストリームからのアイテムを含む文字列の PCollection を返す PTransform。限定された PCollection (DirectPipelineRunner など) のみをサポートする PipelineRunner で実行する場合、入力 Pub/Sub ストリームの限定された部分のみを処理できます。そのため、PubsubIO.Read.Bound.maxNumRecords(int) または PubsubIO.Read.Bound.maxReadTime(Duration) を設定する必要があります。

于 2016-10-12T21:33:03.610 に答える