私の単純な Dataflow パイプラインは、ほとんどの場合、1 つのプロジェクトのデータストアから別のプロジェクトに複数の種類を正常にコピーします。しかし、特定の種類 (それらの約 5%) では、常にこれらのエラーが発生します。
データフローは約 75 秒の遅延で 4 ~ 8 回再試行し、その後パイプラインが失敗します。
これを診断して解決するにはどうすればよいですか?
編集: 答えは次のとおりです。(1) Dataflow で使用されるデータストア ライブラリにバグがありました。彼らがこのバグを修正した後、根本的な原因を確認できます。また、(2) このライブラリにエンティティを配置するためのデフォルトのバッチ サイズは 500 であり、これは最大でもあり、Datastore API の制限である 10 Mb を超えています。
(非常に単純な) パイプラインは次のようになります。
Query.Builder qb = Query.newBuilder();
qb.addKindBuilder().setName(kindName);
Query query = qb.build();
Read dsRead = DatastoreIO.v1().read().withProjectId(inputProject).withQuery(query);
Write dsWrite = DatastoreIO.v1().write().withProjectId(outputProject);
PCollection<Entity> sourceEntities = pipeline.apply("read", dsRead);
Bound<Entity, Entity> entityFromSrcToTarget = ParDo.of(new EntityDoFn());/*Simple DoFn that copies Entities for insertion to target*/
PCollection<Entity> clonedEntities = sourceEntities.apply("clone-entity", entityFromSrcToTarget);
clonedEntities.apply("write-to-ds", dsWrite);
最初のスタックトレース
com.google.datastore.v1.client.DatastoreException: I/O error, code=UNAVAILABLE at
com.google.datastore.v1.client.RemoteRpc.makeException(RemoteRpc.java:126) at
com.google.datastore.v1.client.RemoteRpc.call(RemoteRpc.java:95) at
com.google.datastore.v1.client.Datastore.commit(Datastore.java:84) at
com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1$DatastoreWriterFn.flushBatch(DatastoreV1.java:925) at
com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1$DatastoreWriterFn.finishBundle(DatastoreV1.java:899) Caused by: java.io.IOException: insufficient data written at
sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.close(HttpURLConnection.java:3500) at
com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:81) at
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981) at
com.google.datastore.v1.client.RemoteRpc.call(RemoteRpc.java:87) at
com.google.datastore.v1.client.Datastore.commit(Datastore.java:84) at
com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1$DatastoreWriterFn.flushBatch(DatastoreV1.java:925) at
com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1$DatastoreWriterFn.finishBundle(DatastoreV1.java:899) at
com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.finishBundle(DoFnRunnerBase.java:158) at
com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:196) at
com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.finishBundle(ForwardingParDoFn.java:47) at
com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.finish(ParDoOperation.java:65) at
com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:80) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:287) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:223) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at
java.lang.Thread.run(Thread.java:745)
また
(9908b474b1492772): java.lang.RuntimeException:
com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException:
com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException:
com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException:
com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException:
com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException:
com.google.cloud.dataflow.sdk.util.UserCodeException:
com.google.datastore.v1.client.DatastoreException: I/O error, code=UNAVAILABLE at
com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:162) at
com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) at
com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:283) at
com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext$1.outputWindowedValue(DoFnRunnerBase.java:507) at
com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaIteratorsDoFn.processElement(GroupAlsoByWindowsViaIteratorsDoFn.java:125) at
com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) at
com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at
com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at
com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47) at
com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:55) at
com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at
com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:202) at
com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:143) at
com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:72) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:287) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:223) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at
java.lang.Thread.run(Thread.java:745)