Ismail Addou
Ismail Addou

Reputation: 393

SerializableCoder cannot be cast to BeamRecordCoder

I prepared a Pcollection<BeamRecord> object from a file containing json objects using beam sql sdk.

The code below parse and map json lines to ChatHistory objects, then it converts the mapped objects to BeamRecord. Finally I try to use BeamSql on the returned PCollection<BeamRecord> but I get the exception SerializableCoder cannot be cast to BeamRecordCoder.

PCollection<ChatHistory> json_objects = lines.apply(ParDo.of(new ExtractObjectsFn()));

    // Convert them to BeamRecords with the same schema as defined above via a DoFn.
      PCollection<BeamRecord> apps = json_objects.apply(
          ParDo.of(new DoFn<ChatHistory, BeamRecord>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                List<String> fields_list= new ArrayList<String>(Arrays.asList("conversation_id","message_type","message_date","message","message_auto_id"));
                List<Integer> types_list= new ArrayList<Integer>(Arrays.asList(Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR));
                BeamRecordSqlType brtype = BeamRecordSqlType.create(fields_list, types_list);
                BeamRecord br = new BeamRecord(
                              brtype, 
                              c.element().conversation_id, 
                              c.element().message_type, 
                              c.element().message_date,
                              c.element().message, 
                              c.element().message_auto_id
                              );

                c.output(br);
            }
          })); 

      return apps.apply(
          BeamSql
              .query("SELECT conversation_id, message_type, message, message_date, message_auto_id FROM PCOLLECTION")
       ); 

Here is the generated stack trace

java.lang.ClassCastException: org.apache.beam.sdk.coders.SerializableCoder cannot be cast to org.apache.beam.sdk.coders.BeamRecordCoder
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.registerTables (BeamSql.java:173)
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand (BeamSql.java:153)
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand (BeamSql.java:116)
at org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:472)
at org.apache.beam.sdk.values.PCollectionTuple.apply (PCollectionTuple.java:160)
at org.apache.beam.sdk.extensions.sql.BeamSql$SimpleQueryTransform.expand (BeamSql.java:246)
at org.apache.beam.sdk.extensions.sql.BeamSql$SimpleQueryTransform.expand (BeamSql.java:186)
at org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:472)
at org.apache.beam.sdk.values.PCollection.apply (PCollection.java:286)
at com.mdm.trial.trial3$JsonParse.expand (trial3.java:123)
at com.mdm.trial.trial3$JsonParse.expand (trial3.java:1)
at org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:472)
at org.apache.beam.sdk.values.PCollection.apply (PCollection.java:286)
at com.mdm.trial.trial3.main (trial3.java:160)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:748)

I saw a similar post, but It still can't fix my error: Running BeamSql WithoutCoder or Making Coder Dynamic

Best regards !

Upvotes: 1

Views: 1477

Answers (1)

Anton
Anton

Reputation: 2539

Ismail, in your case using the .setCoder() should work.

I would try extracting the row type out of the ParDo, and then applying it to apps before applying the SQL query:

  PCollection<ChatHistory> json_objects = lines.apply(ParDo.of(new ExtractObjectsFn()));
  
  // Create a row type first:
  List<String> fields_list= new ArrayList<String>(Arrays.asList("conversation_id","message_type","message_date","message","message_auto_id"));
  List<Integer> types_list= new ArrayList<Integer>(Arrays.asList(Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR));
  final BeamRecordSqlType brtype = BeamRecordSqlType.create(fields_list, types_list);
  
  // Convert them to BeamRecords with the same schema as defined above via a DoFn.
  PCollection<BeamRecord> apps = json_objects.apply(
      ParDo.of(new DoFn<ChatHistory, BeamRecord>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            BeamRecord br = new BeamRecord(
                          brtype, 
                          c.element().conversation_id, 
                          c.element().message_type, 
                          c.element().message_date,
                          c.element().message, 
                          c.element().message_auto_id
                          );
            c.output(br);
        }
      })); 
  
  return apps
    .setCoder(brtype.getRecordCoder())
    .apply(
      BeamSql
          .query("SELECT conversation_id, message_type, message, message_date, message_auto_id FROM PCOLLECTION")
   ); 

Couple of examples:

  • This example sets the coder by using Create.withCoder() which does the same thing;
  • This example filters and converts from Events using the ToRow.parDo() and then also sets the coder which is specified here;

Please note that BeamRecord has been renamed to Row, and few other changes are reflected in the examples above.

Upvotes: 3

Related Questions