0

パイプラインをローカルで実行すると、次の例外が発生します。クラウド実行のために提出する場合も例外ではありません。

ありがとう、ジェナディ

INFO: Executing pipeline using the DirectPipelineRunner.
Exception in thread "main" java.lang.IllegalStateException: no evaluator registered for GroupedValues [GroupedValues]
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:606)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:200)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:196)
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:109)
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:204)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:583)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:327)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:70)
at app.Main.main(Main.java:124)

コードの概要は基本的に次のとおりです。

PCollection<KV<MyKey, Iterable<MyValue>>> groupedByMyKey = ...
PCollection<KV<MyKey, MyAggregated>> aggregated = groupedByMyKey.apply(
        Combine.<MyKey, MyValue, MyAggregated>groupedValues(new Aggregator()));

アグリゲーター クラスの拡張CombineFn<MyValue, List<MyValue>, MyAggregated>

4

2 に答える 2

1

これをトリガーするコード スニペットを共有できますか? GroupedValues は、さまざまな結合変換内でよく使用される PTransform であるため、Min、Max などの使用によるものである可能性があります。

このエラーは、DirectPipelineRunner が GroupedValues を評価する方法を認識していないことを意味します。ただし、実行前に ParDo に展開されているはずなので、これは予想外です。

于 2014-12-29T16:46:30.093 に答える
1

この行動の理由がわかりました

コマンドライン引数を使用してリモートモードで実行し(--runner=BlockingDataflowPipelineRunner)、ローカルで実行するように強制しました

PipelineRunner<?> runner = DirectPipelineRunner.fromOptions(options);
runner.run(p);

これらの行を削除して--runner=DirectPipelineRunner引数を使用するだけで、期待どおりに機能しました。

于 2014-12-30T09:56:13.837 に答える