0

Dataflow (DatastoreIO) から 300k レコードを含む Datastore テーブルを読み込もうとしていますが、Datastore API から次のエラーが発生します。

400 Bad Request 複合フィルターには少なくとも 1 つのサブフィルターが必要です

コード:

Query.Builder q = Query.newBuilder();
q.addKindBuilder().setName(options.getKind());
Query query = q.build();

(...)

Pipeline p = Pipeline.create(options);
p.apply(DatastoreIO.readFrom(options.getDataset(), query).named("ReadFromDatastore"))...

エラー (終了前に 4 回発生):

[INFO] ------------------------------------------------------------------------
[INFO] Building Google Cloud Dataflow Java Examples - All manual_build
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] >>> exec-maven-plugin:1.1:java (default-cli) @ google-cloud-dataflow-java-examples-all >>>
[INFO] 
[INFO] <<< exec-maven-plugin:1.1:java (default-cli) @ google-cloud-dataflow-java-examples-all <<<
[INFO] 
[INFO] --- exec-maven-plugin:1.1:java (default-cli) @ google-cloud-dataflow-java-examples-all ---
Mar 14, 2015 8:58:48 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 41 files. Enable logging at DEBUG level to see which files will be staged.
Mar 14, 2015 8:58:50 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner run
INFO: Executing pipeline on the Dataflow Service, which will have billing implications related to Google Compute Engine usage and other Google Cloud Services.
Mar 14, 2015 8:58:50 AM com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElementsToGcs
INFO: Uploading 41 files from PipelineOptions.filesToStage to GCS to prepare for execution in the cloud.
Mar 14, 2015 8:59:12 AM com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElementsToGcs
INFO: Uploading PipelineOptions.filesToStage complete: 1 files newly uploaded, 40 files cached
Mar 14, 2015 8:59:13 AM com.google.cloud.dataflow.sdk.io.DatastoreIO$Source queryLatestStatisticsTimestamp
INFO: Query for latest stats timestamp of dataset primebus01 took 854ms
Mar 14, 2015 8:59:13 AM com.google.cloud.dataflow.sdk.io.DatastoreIO$Source getEstimatedSizeBytes
INFO: Query for per-kind statistics took 233ms
Dataflow SDK version: manual_build
Mar 14, 2015 8:59:16 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner run
INFO: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/primebus01/dataflow/job/2015-03-14_04_59_16-991787963429613914
Submitted job: 2015-03-14_04_59_16-991787963429613914
2015-03-14T11:59:17.796Z: (ab0c86e705e7447a): Expanding GroupByKey operations into optimizable parts.
2015-03-14T11:59:17.800Z: (ab0c86e705e749b0): Annotating graph with Autotuner information.
2015-03-14T11:59:17.806Z: (ab0c86e705e74ee6): Fusing adjacent ParDo, Read, Write, and Flatten operations
2015-03-14T11:59:17.812Z: (ab0c86e705e7441c): Fusing consumer GetContent into ReadFromDatastore
2015-03-14T11:59:17.815Z: (ab0c86e705e74952): Fusing consumer CountWords/Count.PerElement/Init into CountWords/ExtractWords
2015-03-14T11:59:17.818Z: (ab0c86e705e74e88): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupedValues into CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Read
2015-03-14T11:59:17.820Z: (ab0c86e705e743be): Fusing consumer WriteLines into CountWords/FormatCounts
2015-03-14T11:59:17.822Z: (ab0c86e705e748f4): Fusing consumer CountWords/FormatCounts into CountWords/Count.PerElement/Sum.PerKey/GroupedValues/Extract
2015-03-14T11:59:17.824Z: (ab0c86e705e74e2a): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Write into CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput
2015-03-14T11:59:17.826Z: (ab0c86e705e74360): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupedValues/Extract into CountWords/Count.PerElement/Sum.PerKey/GroupedValues
2015-03-14T11:59:17.828Z: (ab0c86e705e74896): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput into CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Partial
2015-03-14T11:59:17.830Z: (ab0c86e705e74dcc): Fusing consumer CountWords/ExtractWords into GetContent
2015-03-14T11:59:17.832Z: (ab0c86e705e74302): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Partial into CountWords/Count.PerElement/Init
2015-03-14T11:59:17.843Z: (ab0c86e705e74d10): Adding StepResource setup and teardown to workflow graph.
2015-03-14T11:59:17.850Z: (ab0c86e705e7477c): Not adding lease related steps.
2015-03-14T11:59:17.864Z: (ac5ca5b613993974): Starting the input generators.
2015-03-14T11:59:17.882Z: (9a0f95eb7a7962f5): Adding workflow start and stop steps.
2015-03-14T11:59:17.884Z: (9a0f95eb7a796623): Assigning stage ids.
2015-03-14T11:59:18.290Z: (eb8131b6a76f5248): Starting worker pool setup.
2015-03-14T11:59:18.295Z: (eb8131b6a76f53ac): Starting 3 workers...
2015-03-14T11:59:18.318Z: S01: (1174d086003eadad): Executing operation CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Create
2015-03-14T11:59:18.345Z: (d91fb5c6a16bad02): Value "CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Session" materialized.
2015-03-14T11:59:18.367Z: S02: (1174d086003ea94c): Executing operation ReadFromDatastore+GetContent+CountWords/ExtractWords+CountWords/Count.PerElement/Init+CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Partial+CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput+CountWords/Count.PerElement/Sum....
2015-03-14T12:00:19.839Z: (9db26953adb2a181): java.io.IOException: java.io.IOException: com.google.api.services.datastore.client.DatastoreException: a composite filter must have at least one sub-filter
    at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat$ReaderIterator.advanceInternal(BasicSerializableSourceFormat.java:266)
    at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat$ReaderIterator.hasNext(BasicSerializableSourceFormat.java:239)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:173)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:120)
    at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:129)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:94)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$1.call(DataflowWorkerHarness.java:118)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$1.call(DataflowWorkerHarness.java:115)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: com.google.api.services.datastore.client.DatastoreException: a composite filter must have at least one sub-filter
    at com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.advance(DatastoreIO.java:599)
    at com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.start(DatastoreIO.java:590)
    at com.google.cloud.dataflow.sdk.io.Source$WindowedReaderWrapper.start(Source.java:178)
    at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat$ReaderIterator.advanceInternal(BasicSerializableSourceFormat.java:257)
    ... 14 more
Caused by: com.google.api.services.datastore.client.DatastoreException: a composite filter must have at least one sub-filter
    at com.google.api.services.datastore.client.RemoteRpc.makeException(RemoteRpc.java:115)
    at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:81)
    at com.google.api.services.datastore.client.BaseDatastoreFactory$RemoteRpc.call(BaseDatastoreFactory.java:41)
    at com.google.api.services.datastore.client.Datastore.runQuery(Datastore.java:109)
    at com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.getIteratorAndMoveCursor(DatastoreIO.java:630)
    at com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.advance(DatastoreIO.java:597)
    ... 17 more
Caused by: com.google.api.client.http.HttpResponseException: 400 Bad Request
a composite filter must have at least one sub-filter
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1054)
    at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:78)
    ... 21 more

2015-03-14T12:00:24.850Z: (7ffa05cd66cd6940): Failed task is going to be retried.


................................


2015-03-14T12:01:02.703Z: (1174d086003eaff0): Executing failure step failure1
2015-03-14T12:01:02.707Z: (1174d086003ea342): Workflow failed. Causes: (ac5ca5b613993837): Map task completion for Step "ReadFromDatastore+GetContent+CountWords/ExtractWords+CountWords/Count.PerElement/Init+CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Partial+CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput+CountWords/Count.PerElement/Sum...." failed. Causes: (9013e6edb4cda414): Task has been attempted unsuccessfully 4 times, the maximum allowed.
2015-03-14T12:01:02.742Z: (157f9c08481a25c5): Stopping the input generators.
2015-03-14T12:01:02.756Z: (697e8bf226b989af): Cleaning up.
2015-03-14T12:01:02.765Z: (697e8bf226b98f98): Tearing down pending resources...
2015-03-14T12:01:02.771Z: (697e8bf226b98581): Starting worker pool teardown.
2015-03-14T12:01:02.776Z: (697e8bf226b9880d): Stopping worker pool...

これにより、他の質問が発生します。

1 - この場合、フィルターを定義する必要がありますか?

2 - Dataflow がジョブを分割するために使用する基準は何ですか?

3 - データストアから大きなテーブルを簡単にダンプする方法はありますか?

ありがとう。

4

2 に答える 2

3

あなたが抱えている問題を特定できたと思います:

  • DatastoreIO クエリが分割される方法にバグがあります (クエリスプリッターを使用します。特定の条件下 (特に、クエリが返す結果が非常に少ない場合) では、空の複合フィルターを含む並列シャードに対して無効なクエリが生成される可能性があります)。このバグについて Datastore チームに通知し、現在対応中です。
  • コード スニペットによると、クエリは単にデータセットから特定の種類のすべてのエンティティを読み取っています。上記の問題を引き起こす可能性がある唯一の方法は、このクエリがゼロの結果を返した場合です。これは、名前空間を使用している場合に可能です。残念ながら、現在 QuerySplitter も名前空間をサポートしていません。チームはこの制限を取り除くことを検討していますが、公開の時間枠はありません.
  • Dataflow SDK のエラー メッセージは、何が起こっているかをより明確にする必要があり、SDK はこれらのエラーを早期に検出する必要があります。これを修正します。

今のところ、名前空間を使用している場合、データストアからの読み取りプロセスを並列化するという点で運が悪いようです :( その後に処理の大部分が発生した場合 (つまり、重要な量を実行した場合) Datastore エンティティごとの作業量)、現在の最善の策は、エンティティを GCS にダンプする (本質的に非並列の) パイプラインを作成し、そこからエンティティを読み取って並列処理する 2 つ目のパイプラインを作成することです。

最初のパイプラインは次のようになります。

Pipeline pipeline = Pipeline.create(options);
pipeline.apply(DatastoreIO.readFrom(dataset, query).named("ReadFromDatastore"))
        .apply(ParDo.of(new ConvertEntitiesFn())).setCoder(AvroCoder.of(MyEntity.class))
        .apply(AvroIO.Write.named("WriteEntitiesToGCS")
                           .to("gs://your-bucket/path/to/temp.avro")
                           .withSchema(MyEntity.class)
                           .withNumShards(1))

ここで、MyEntity はエンティティの種類を表すクラスであり、ConvertEntitiesFn はDoFn<DatastoreV1.Entity, MyEntity>変換を行っています。

DirectPipelineRunner を使用してこのパイプラインを実行してください ( DatastoreWordCount.java の例writeDataToDatastoreでの方法と同様)。これにより、クエリ分割段階がバイパスされます。

そして2番目は次のようなものです:

Pipeline pipeline = Pipeline.create(options);
pipeline.apply(AvroIO.Read.named("ReadEntitiesFromGCS")
                          .from("gs://your-bucket/path/to/temp.avro")
                          .withSchema(MyEntity.class))
        .apply(the rest of your pipeline)

これは素晴らしい回避策ではないことを認識しています。データストア ライブラリの更新にご期待ください。

于 2015-03-16T23:30:05.223 に答える