Naman Kumar
Naman Kumar

Reputation: 19

How can we prevent empty file write in dataflow pipeline when collection size is 0?

I have a dataflow pipeline and I'm parsing a file if I got any incorrect records then I'm writing it on the GCS bucket, but when there are no errors in the input file data still TextIO writes the empty file on the GCS bucket with a header.

So, how can we prevent this if the PCollection size is zero then skip this step?

errorRecords.apply("WritingErrorRecords", TextIO.write().to(options.getBucketPath())
             .withHeader("ID|ERROR_CODE|ERROR_MESSAGE")
             .withoutSharding()
             .withSuffix(".txt")
             .withShardNameTemplate("-SSS")
             .withNumShards(1));
        

Upvotes: 0

Views: 811

Answers (2)

Valentyn
Valentyn

Reputation: 565

Beam TextIO added support for skipIfEmpty() in 2.40.0, see: https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.TypedWrite.html#skipIfEmpty--

Upvotes: 0

robertwb
robertwb

Reputation: 5104

TextIO.write() always writes at least one shard, even if it is empty. As you are writing to a single shard anyway, you could get around this behavior by doing the write manually in a DoFn that takes the to-be-written elements as a side input, e.g.

PCollectionView<List<String>> errorRecordsView = errorRecords.apply(
    View.<String>asList());

// Your "main" PCollection is a PCollection with a single input,
// so the DoFn will get invoked exactly once. 
p.apply(Create.of(new String[]{"whatever"}))
 // The side input is your error records.
 .apply(ParDo.of(new DoFn<String, String>() {
      @ProcessElement
      public void processElement(
          @Element String unused,
          OutputReceiver<String> out,
          ProcessContext c) {
        List<String> errors = c.sideInput(errorRecordsView);
        if (!errors.isEmpty()) {
          // Open the file manually and write all the errors to it.
        }
      }
  }).withSideInputs(errorRecordsView);

Being able to do so with the native Beam writes is a reasonable request. This is not supported in the latest release of Beam by setting skipIfEmpty.

Upvotes: 0

Related Questions