2

Google のデータフロー サービスでデータを処理しているときに無効なデータを報告する方法を見つけるために、ドキュメント提供されている例を調べています。

Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.named("ReadMyFile").from(options.getInput()))
 .apply(new SomeTransformation())
 .apply(TextIO.Write.named("WriteMyFile").to(options.getOutput()));
p.run();

実際の入力/出力に加えて、無効と見なされるレコードを含む 2 番目の出力ファイルを作成したいと考えています (データの欠落、不正な形式のデータ、値が高すぎるなど)。それらのレコードをトラブルシューティングして、個別に処理したいと考えています。

  • 入力: gs://.../input.csv
  • 出力: gs://.../output.csv
  • 無効なレコードのリスト: gs://.../invalid.csv

これらの無効なレコードを別の出力にリダイレクトするにはどうすればよいですか?

4

2 に答える 2

3

PCollectionTuples を使用して、1 つの変換から複数の PCollection を返すことができます。例えば、

TupleTag<String> mainOutput = new TupleTag<>("main");
TupleTag<String> missingData = new TupleTag<>("missing");
TupleTag<String> badValues = new TupleTag<>("bad");

Pipeline p = Pipeline.create(options);
PCollectionTuple all = p
   .apply(TextIO.Read.named("ReadMyFile").from(options.getInput()))
   .apply(new SomeTransformation());

all.get(mainOutput)
   .apply(TextIO.Write.named("WriteMyFile").to(options.getOutput()));
all.get(missingData)
   .apply(TextIO.Write.named("WriteMissingData").to(...));
...

PCollectionTuples は、既存の PCollection から直接構築することも、ParDo オペレーションから副出力を使用して生成することもできます。

PCollectionTuple partitioned = input.apply(ParDo
    .of(new DoFn<String, String>() {
          public void processElement(ProcessContext c) {
             if (checkOK(c.element()) {
                 // Shows up in partitioned.get(mainOutput).
                 c.output(...);
             } else if (hasMissingData(c.element())) {
                 // Shows up in partitioned.get(missingData).
                 c.sideOutput(missingData, c.element());
             } else {
                 // Shows up in partitioned.get(badValues).
                 c.sideOutput(badValues, c.element());
             }
          }
        })
    .withOutputTags(mainOutput, TupleTagList.of(missingData).and(badValues)));

一般に、さまざまな副出力が同じタイプである必要はなく、データは任意の数の副出力に何度でも発行できることに注意してください (ここでの厳密な分割ではありません)。

SomeTransformation クラスは次のようになります

class SomeTransformation extends PTransform<PCollection<String>,
                                            PCollectionTuple> {
  public PCollectionTuple apply(PCollection<String> input) {
    // Filter into good and bad data.
    PCollectionTuple partitioned = ...
    // Process the good data.
    PCollection<String> processed =
        partitioned.get(mainOutput)
                   .apply(...)
                   .apply(...)
                   ...;
    // Repackage everything into a new output tuple.
    return PCollectionTuple.of(mainOutput, processed)
                           .and(missingData, partitioned.get(missingData))
                           .and(badValues, partitioned.get(badValues));
  }
}
于 2015-01-02T21:46:43.453 に答える
0

sideOutputs を使用するという Robert の提案は素晴らしいですが、これは、ParDos によって不良データが識別された場合にのみ機能することに注意してください。現在、最初のデコード中にヒットした不良レコードを特定する方法はありません (エラーが Coder.decode でヒットした場合)。すぐにそれに取り組む計画があります。

于 2015-01-03T00:29:38.020 に答える