bigdataclown
bigdataclown

Reputation: 139

Google Dataflow: java.lang.IllegalArgumentException: Cannot setCoder(null)

I am trying to build a custom sink for unzipping files.

Having this simple code:

public static class ZipIO{    
  public static class Sink extends com.google.cloud.dataflow.sdk.io.Sink<String> {

    private static final long serialVersionUID = -7414200726778377175L;
    private final String unzipTarget;

      public Sink withDestinationPath(String s){
         if(s!=""){
             return new Sink(s);
         }
         else {
             throw new IllegalArgumentException("must assign destination path");
         }

      }

      protected Sink(String path){
          this.unzipTarget = path;
      }

      @Override
      public void validate(PipelineOptions po){
          if(unzipTarget==null){
              throw new RuntimeException();
          }
      } 

      @Override
      public ZipFileWriteOperation createWriteOperation(PipelineOptions po){
          return new ZipFileWriteOperation(this);
      }

  }

  private static class ZipFileWriteOperation extends WriteOperation<String, UnzipResult>{

    private static final long serialVersionUID = 7976541367499831605L;
    private final ZipIO.Sink sink;

      public ZipFileWriteOperation(ZipIO.Sink sink){
          this.sink = sink;
      }



      @Override
      public void initialize(PipelineOptions po) throws Exception{

      }

      @Override
      public void finalize(Iterable<UnzipResult> writerResults, PipelineOptions po) throws Exception {
         long totalFiles = 0;
         for(UnzipResult r:writerResults){
             totalFiles +=r.filesUnziped;
         }
         LOG.info("Unzipped {} Files",totalFiles);
      }  

      @Override
      public ZipIO.Sink getSink(){
          return sink;
      }

      @Override
      public ZipWriter createWriter(PipelineOptions po) throws Exception{
          return new ZipWriter(this);
      }

  }

  private static class ZipWriter extends Writer<String, UnzipResult>{
      private final ZipFileWriteOperation writeOp;
      public long totalUnzipped = 0;

      ZipWriter(ZipFileWriteOperation writeOp){
          this.writeOp = writeOp;
      }

      @Override
      public void open(String uID) throws Exception{
      }

      @Override
      public void write(String p){
            System.out.println(p);
      }

      @Override
      public UnzipResult close() throws Exception{
          return new UnzipResult(this.totalUnzipped);
      }

      @Override
      public ZipFileWriteOperation getWriteOperation(){
          return writeOp;
      }


  }

  private static class UnzipResult implements Serializable{  
    private static final long serialVersionUID = -8504626439217544799L;
    public long filesUnziped=0;      
      public UnzipResult(long filesUnziped){
          this.filesUnziped=filesUnziped;
      }
  }
}

}

The processing fails with error:

Exception in thread "main" java.lang.IllegalArgumentException: Cannot setCoder(null) at com.google.cloud.dataflow.sdk.values.TypedPValue.setCoder(TypedPValue.java:67) at com.google.cloud.dataflow.sdk.values.PCollection.setCoder(PCollection.java:150) at com.google.cloud.dataflow.sdk.io.Write$Bound.createWrite(Write.java:380) at com.google.cloud.dataflow.sdk.io.Write$Bound.apply(Write.java:112) at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$BatchWrite.apply(DataflowPipelineRunner.java:2118) at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$BatchWrite.apply(DataflowPipelineRunner.java:2099) at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:75) at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:465) at com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner.apply(BlockingDataflowPipelineRunner.java:169) at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:368) at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:275) at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:463) at com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner.apply(BlockingDataflowPipelineRunner.java:169) at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:368) at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:291) at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:174) at com.mcd.de.tlogdataflow.StarterPipeline.main(StarterPipeline.java:93)

Any help is appreciated.

Thanks & BR Philipp

Upvotes: 1

Views: 405

Answers (1)

Kenn Knowles
Kenn Knowles

Reputation: 6023

This crash is caused by a bug in the Dataflow Java SDK (specifically, this line) which was also present in the Apache Beam (incubating) Java SDK.

The method Sink.WriterOperation#getWriterResultCoder() must always be overridden, but we failed to mark it abstract. It is fixed in Beam, but unchanged in the Dataflow SDK. You should override this method and return an appropriate coder.

You have some options to come up with the coder:

  1. Write your own small coder class, wrapping one of VarLongCoder or BigEndianLongCoder
  2. Just use a long instead of the UnzipResult structure so you can use those as-is.
  3. Less advisable due to the excess size, you could use SerializableCoder.of(UnzipResult.class)

Upvotes: 0

Related Questions