Ingo
Ingo

Reputation: 1572

How to report invalid data while processing data with Google dataflow?

I am looking at the documentation and the provided examples to find out how I can report invalid data while processing data with Google's dataflow service.

Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.named("ReadMyFile").from(options.getInput()))
 .apply(new SomeTransformation())
 .apply(TextIO.Write.named("WriteMyFile").to(options.getOutput()));
p.run();

In addition to the actual in-/output, I want to produce a 2nd output file that contains records that which are considered invalid (e.g. missing data, malformed data, values were too high). I want to troubleshoot those records and process them separately.

How can I redirect those invalid records into a separate output?

Upvotes: 2

Views: 899

Answers (2)

Frances
Frances

Reputation: 4021

Robert's suggestion of using sideOutputs is great, but note that this will only work if the bad data is identified by your ParDos. There currently isn't a way to identify bad records hit during initial decoding (where the error is hit in Coder.decode). We've got plans to work on that soon.

Upvotes: 0

robertwb
robertwb

Reputation: 5104

You can use PCollectionTuples to return multiple PCollections from a single transform. For example,

TupleTag<String> mainOutput = new TupleTag<>("main");
TupleTag<String> missingData = new TupleTag<>("missing");
TupleTag<String> badValues = new TupleTag<>("bad");

Pipeline p = Pipeline.create(options);
PCollectionTuple all = p
   .apply(TextIO.Read.named("ReadMyFile").from(options.getInput()))
   .apply(new SomeTransformation());

all.get(mainOutput)
   .apply(TextIO.Write.named("WriteMyFile").to(options.getOutput()));
all.get(missingData)
   .apply(TextIO.Write.named("WriteMissingData").to(...));
...

PCollectionTuples can either be built up directly out of existing PCollections, or emitted from ParDo operations with side outputs, e.g.

PCollectionTuple partitioned = input.apply(ParDo
    .of(new DoFn<String, String>() {
          public void processElement(ProcessContext c) {
             if (checkOK(c.element()) {
                 // Shows up in partitioned.get(mainOutput).
                 c.output(...);
             } else if (hasMissingData(c.element())) {
                 // Shows up in partitioned.get(missingData).
                 c.sideOutput(missingData, c.element());
             } else {
                 // Shows up in partitioned.get(badValues).
                 c.sideOutput(badValues, c.element());
             }
          }
        })
    .withOutputTags(mainOutput, TupleTagList.of(missingData).and(badValues)));

Note that in general the various side outputs need not have the same type, and data can be emitted any number of times to any number of side outputs (rather than the strict partitioning we have here).

Your SomeTransformation class could then look something like

class SomeTransformation extends PTransform<PCollection<String>,
                                            PCollectionTuple> {
  public PCollectionTuple apply(PCollection<String> input) {
    // Filter into good and bad data.
    PCollectionTuple partitioned = ...
    // Process the good data.
    PCollection<String> processed =
        partitioned.get(mainOutput)
                   .apply(...)
                   .apply(...)
                   ...;
    // Repackage everything into a new output tuple.
    return PCollectionTuple.of(mainOutput, processed)
                           .and(missingData, partitioned.get(missingData))
                           .and(badValues, partitioned.get(badValues));
  }
}

Upvotes: 3

Related Questions