PCollectionTuples を使用して、1 つの変換から複数の PCollection を返すことができます。例えば、
TupleTag<String> mainOutput = new TupleTag<>("main");
TupleTag<String> missingData = new TupleTag<>("missing");
TupleTag<String> badValues = new TupleTag<>("bad");
Pipeline p = Pipeline.create(options);
PCollectionTuple all = p
.apply(TextIO.Read.named("ReadMyFile").from(options.getInput()))
.apply(new SomeTransformation());
all.get(mainOutput)
.apply(TextIO.Write.named("WriteMyFile").to(options.getOutput()));
all.get(missingData)
.apply(TextIO.Write.named("WriteMissingData").to(...));
...
PCollectionTuples は、既存の PCollection から直接構築することも、ParDo オペレーションから副出力を使用して生成することもできます。
PCollectionTuple partitioned = input.apply(ParDo
.of(new DoFn<String, String>() {
public void processElement(ProcessContext c) {
if (checkOK(c.element()) {
// Shows up in partitioned.get(mainOutput).
c.output(...);
} else if (hasMissingData(c.element())) {
// Shows up in partitioned.get(missingData).
c.sideOutput(missingData, c.element());
} else {
// Shows up in partitioned.get(badValues).
c.sideOutput(badValues, c.element());
}
}
})
.withOutputTags(mainOutput, TupleTagList.of(missingData).and(badValues)));
一般に、さまざまな副出力が同じタイプである必要はなく、データは任意の数の副出力に何度でも発行できることに注意してください (ここでの厳密な分割ではありません)。
SomeTransformation クラスは次のようになります
class SomeTransformation extends PTransform<PCollection<String>,
PCollectionTuple> {
public PCollectionTuple apply(PCollection<String> input) {
// Filter into good and bad data.
PCollectionTuple partitioned = ...
// Process the good data.
PCollection<String> processed =
partitioned.get(mainOutput)
.apply(...)
.apply(...)
...;
// Repackage everything into a new output tuple.
return PCollectionTuple.of(mainOutput, processed)
.and(missingData, partitioned.get(missingData))
.and(badValues, partitioned.get(badValues));
}
}