Michael Julian
Michael Julian

Reputation: 75

How to create pipeline from postgres to parquet?

We are creating a dataflow pipeline, we will read the data from postgres and write it to a parquet file. we are using the org.apache.beam.sdk.io.jdbc to read and org.apache.beam.sdk.io.parquet package to write a file. ParquetIO.Sink allows you to write a PCollection of GenericRecord into a Parquet file (from here https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/parquet/ParquetIO.html).

this is my code so far:

Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

Schema schema = SchemaBuilder
                .record("table").namespace("org.apache.avro.ipc")
                .fields()
                .name("id").type("int").noDefault()
                .name("number").type("int").noDefault()
                .name("name").type().stringType().noDefault()
                .name("password").type().stringType().noDefault()

p.apply(JdbcIO.<GenericRecord> read()
            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                    "org.postgresql.Driver", "jdbc:postgresql://localhost:port/database")
                    .withUsername("username")
                    .withPassword("password"))
                .withQuery("select * from table")
                .withRowMapper((JdbcIO.RowMapper<GenericRecord>) resultSet -> {
                        GenericRecord record = new GenericData.Record(schema);
                        ResultSetMetaData metadata = resultSet.getMetaData();
                        int columnsNumber = metadata.getColumnCount();
                        for(int i=0; i<columnsNumber; i++) {
                            String columnValue = resultSet.getString(i+1);
                            record.put(i, columnValue);
                        }
                    return record;
                })
                .withCoder(AvroCoder.of(schema)))
            .apply(FileIO.<GenericRecord>write()
                    .via(ParquetIO.sink(schema).withCompressionCodec(CompressionCodecName.SNAPPY))
                    .to("somethingg.parquet")
                    );
p.run()

and i get this error

Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn@4393593c, mainOutputTag=Tag<output>, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:564)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation$1.translateDoFn(ParDoTranslation.java:212)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:705)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:208)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:187)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation$ParDoTranslator.translate(ParDoTranslation.java:125)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:155)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:651)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:666)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:269)
    at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:280)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
    at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:258)
    at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:208)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:170)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
    at com.click.example.StarterPipeline.main(StarterPipeline.java:196)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
    at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51)
    ... 26 more

Upvotes: 2

Views: 1459

Answers (1)

Anton
Anton

Reputation: 2539

The error is pretty much explained in the stack trace: Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema.

withRowMapper() takes a serializable RowMapper<> functional interface. And it is being serialized and deserialized by Beam when it needs to. However in your lambda you also use the instance of Schema that you define outside of the lambda (closure). So when serializing your lambda Java will also have to serialize the schema, because it's used there. But Schema is not serializable, so it fails.

There are few workarounds I can think of:

  • create the schema inside the lambda:

    • in this case the schema instance will not be serialized;
    • it will be created each time the lambda is called;
  • serialize the schema (e.g. into a Json string) into a serializable object outside of the lambda, and then deserialize it within the lambda:

    • it's basically the same thing as above but with an extra serialization step;
    • within the lambda it will still have to be deserialized on each call;
  • find/write a serializable Schema implementation:

    • may be not possible or hard to do;
    • will likely have less overhead as above approaches, as deserialization will only happen when creating an instance of the RowMapper<>;

I think it's perfectly fine to create a new instance of the schema in the lambda unless it causes problems.

Upvotes: 1

Related Questions