Reputation: 75
We are creating a dataflow pipeline, we will read the data from postgres and write it to a parquet file. we are using the org.apache.beam.sdk.io.jdbc to read and org.apache.beam.sdk.io.parquet package to write a file. ParquetIO.Sink allows you to write a PCollection of GenericRecord into a Parquet file (from here https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/parquet/ParquetIO.html).
this is my code so far:
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
Schema schema = SchemaBuilder
.record("table").namespace("org.apache.avro.ipc")
.fields()
.name("id").type("int").noDefault()
.name("number").type("int").noDefault()
.name("name").type().stringType().noDefault()
.name("password").type().stringType().noDefault()
p.apply(JdbcIO.<GenericRecord> read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"org.postgresql.Driver", "jdbc:postgresql://localhost:port/database")
.withUsername("username")
.withPassword("password"))
.withQuery("select * from table")
.withRowMapper((JdbcIO.RowMapper<GenericRecord>) resultSet -> {
GenericRecord record = new GenericData.Record(schema);
ResultSetMetaData metadata = resultSet.getMetaData();
int columnsNumber = metadata.getColumnCount();
for(int i=0; i<columnsNumber; i++) {
String columnValue = resultSet.getString(i+1);
record.put(i, columnValue);
}
return record;
})
.withCoder(AvroCoder.of(schema)))
.apply(FileIO.<GenericRecord>write()
.via(ParquetIO.sink(schema).withCompressionCodec(CompressionCodecName.SNAPPY))
.to("somethingg.parquet")
);
p.run()
and i get this error
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn@4393593c, mainOutputTag=Tag<output>, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:564)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation$1.translateDoFn(ParDoTranslation.java:212)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:705)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:208)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:187)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation$ParDoTranslator.translate(ParDoTranslation.java:125)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:155)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:651)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:666)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:269)
at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:280)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:258)
at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:208)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:170)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
at com.click.example.StarterPipeline.main(StarterPipeline.java:196)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51)
... 26 more
Upvotes: 2
Views: 1459
Reputation: 2539
The error is pretty much explained in the stack trace: Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
.
withRowMapper()
takes a serializable RowMapper<>
functional interface. And it is being serialized and deserialized by Beam when it needs to. However in your lambda you also use the instance of Schema
that you define outside of the lambda (closure). So when serializing your lambda Java will also have to serialize the schema
, because it's used there. But Schema
is not serializable, so it fails.
There are few workarounds I can think of:
create the schema inside the lambda:
serialize the schema (e.g. into a Json string) into a serializable object outside of the lambda, and then deserialize it within the lambda:
find/write a serializable Schema
implementation:
RowMapper<>
;I think it's perfectly fine to create a new instance of the schema in the lambda unless it causes problems.
Upvotes: 1