Reputation: 393
I prepared a Pcollection<BeamRecord>
object from a file containing json objects using beam sql sdk.
The code below parse and map json lines to ChatHistory
objects, then it converts the mapped objects to BeamRecord
. Finally I try to use BeamSql
on the returned PCollection<BeamRecord>
but I get the exception SerializableCoder cannot be cast to BeamRecordCoder.
PCollection<ChatHistory> json_objects = lines.apply(ParDo.of(new ExtractObjectsFn()));
// Convert them to BeamRecords with the same schema as defined above via a DoFn.
PCollection<BeamRecord> apps = json_objects.apply(
ParDo.of(new DoFn<ChatHistory, BeamRecord>() {
@ProcessElement
public void processElement(ProcessContext c) {
List<String> fields_list= new ArrayList<String>(Arrays.asList("conversation_id","message_type","message_date","message","message_auto_id"));
List<Integer> types_list= new ArrayList<Integer>(Arrays.asList(Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR));
BeamRecordSqlType brtype = BeamRecordSqlType.create(fields_list, types_list);
BeamRecord br = new BeamRecord(
brtype,
c.element().conversation_id,
c.element().message_type,
c.element().message_date,
c.element().message,
c.element().message_auto_id
);
c.output(br);
}
}));
return apps.apply(
BeamSql
.query("SELECT conversation_id, message_type, message, message_date, message_auto_id FROM PCOLLECTION")
);
Here is the generated stack trace
java.lang.ClassCastException: org.apache.beam.sdk.coders.SerializableCoder cannot be cast to org.apache.beam.sdk.coders.BeamRecordCoder
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.registerTables (BeamSql.java:173)
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand (BeamSql.java:153)
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand (BeamSql.java:116)
at org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:472)
at org.apache.beam.sdk.values.PCollectionTuple.apply (PCollectionTuple.java:160)
at org.apache.beam.sdk.extensions.sql.BeamSql$SimpleQueryTransform.expand (BeamSql.java:246)
at org.apache.beam.sdk.extensions.sql.BeamSql$SimpleQueryTransform.expand (BeamSql.java:186)
at org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:472)
at org.apache.beam.sdk.values.PCollection.apply (PCollection.java:286)
at com.mdm.trial.trial3$JsonParse.expand (trial3.java:123)
at com.mdm.trial.trial3$JsonParse.expand (trial3.java:1)
at org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:472)
at org.apache.beam.sdk.values.PCollection.apply (PCollection.java:286)
at com.mdm.trial.trial3.main (trial3.java:160)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:748)
I saw a similar post, but It still can't fix my error: Running BeamSql WithoutCoder or Making Coder Dynamic
Best regards !
Upvotes: 1
Views: 1477
Reputation: 2539
Ismail, in your case using the .setCoder()
should work.
I would try extracting the row type out of the ParDo
, and then applying it to apps
before applying the SQL query:
PCollection<ChatHistory> json_objects = lines.apply(ParDo.of(new ExtractObjectsFn()));
// Create a row type first:
List<String> fields_list= new ArrayList<String>(Arrays.asList("conversation_id","message_type","message_date","message","message_auto_id"));
List<Integer> types_list= new ArrayList<Integer>(Arrays.asList(Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR));
final BeamRecordSqlType brtype = BeamRecordSqlType.create(fields_list, types_list);
// Convert them to BeamRecords with the same schema as defined above via a DoFn.
PCollection<BeamRecord> apps = json_objects.apply(
ParDo.of(new DoFn<ChatHistory, BeamRecord>() {
@ProcessElement
public void processElement(ProcessContext c) {
BeamRecord br = new BeamRecord(
brtype,
c.element().conversation_id,
c.element().message_type,
c.element().message_date,
c.element().message,
c.element().message_auto_id
);
c.output(br);
}
}));
return apps
.setCoder(brtype.getRecordCoder())
.apply(
BeamSql
.query("SELECT conversation_id, message_type, message, message_date, message_auto_id FROM PCOLLECTION")
);
Couple of examples:
Create.withCoder()
which does the same thing;Events
using the ToRow.parDo() and then also sets the coder which is specified here;Please note that BeamRecord
has been renamed to Row
, and few other changes are reflected in the examples above.
Upvotes: 3