Vijayendra Yadav
Vijayendra Yadav

Reputation: 51

Flink DataStream[String] kafkaconsumer convert to Avro for Sink

FLINK Streaming: I have DataStream[String] from kafkaconsumer which is JSON

stream = env
    .addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html

I have to sink this stream using StreamingFileSink, which needs DataStream[GenericRecord]

val schema: Schema = ...
val input: DataStream[GenericRecord] = ...
val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
    .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
    .build()
input.addSink(sink)

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html

Question: How to convert DataStream[String] to DataStream[GenericRecord] before Sinking so that I can write AVRO files ?

Exception while converting String stream to generic data strem

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
    at org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
    at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:791)
    at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1168)
    at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:617)
    at com.att.vdcs.StreamingJobKafkaFlink$.main(StreamingJobKafkaFlink.scala:128)
    at com.att.vdcs.StreamingJobKafkaFlink.main(StreamingJobKafkaFlink.scala)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406)
    ... 7 more

After initializing schema in mapper, Getting cast exception.

org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: scala.Tuple2 cannot be cast to java.util.Map

schema and msg below in screen: schema and msg in screen below

Got through CAST Exception by casting like:

record.put(0,scala.collection.JavaConverters.mapAsJavaMapConverter(msg._1).asJava)

Now streaming is working good Except there are extra Escape Characters added

,"body":"\"{\\\"hdr\\\":{\\\"mes

there are extra escape \

it should be like:

,"body":"\"{\"hdr\":{\"mes

extra escape was removed after changing toString to getAsString

Now its working as expected.

Need to try SNAPPY compression of stream next.

Upvotes: 0

Views: 2515

Answers (2)

wanghq09
wanghq09

Reputation: 1

In java, you should use a RichMapFunction to convert DataStream to DataStream and add a transient Schema field to generate GenericRecord. But i dont know how to do this in scala, just for reference.

DataStream<GenericRecord> records = maps.map(new RichMapFunction<Map<String, Object>, GenericRecord>() {
        private transient DatumWriter<IndexedRecord> datumWriter;

        /**
         * Output stream to serialize records into byte array.
         */
        private transient ByteArrayOutputStream arrayOutputStream;

        /**
         * Low-level class for serialization of Avro values.
         */
        private transient Encoder encoder;
        /**
         * Avro serialization schema.
         */
        private transient Schema schema;
        @Override
        public GenericRecord map(Map<String, Object> stringObjectMap) throws Exception {
            GenericRecord record =  new GenericData.Record(schema);
            stringObjectMap.entrySet().forEach(entry->{record.put(entry.getKey(), entry.getValue());});
            return record;
        }


        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.arrayOutputStream = new ByteArrayOutputStream();
            this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
            this.datumWriter = new GenericDatumWriter<>(schema);
            try {
                this.schema = new Schema.Parser().parse(avroSchemaString);
            } catch (SchemaParseException e) {
                throw new IllegalArgumentException("Could not parse Avro schema string.", e);
            }
        }
    });

    final StreamingFileSink<GenericRecord> sink = StreamingFileSink
            .forBulkFormat(new Path("D:\\test"), AvroWriters.forGenericRecord(mongoSchema))
            .build();
    records.addSink(sink);

Upvotes: 0

Robert Metzger
Robert Metzger

Reputation: 4542

You need to transform your stream of Strings into a stream of GenericRecords, for example using a .map() function.

Example:

DataStream<String> strings = env.addSource( ... );
DataStream<GenericRecord> records = strings.map(inputStr -> {
    GenericData.Record rec = new GenericData.Record(schema);
    rec.put(0, inputStr);
    return rec;
});

Please note that using GenericRecord can lead to a poor performance, because the schema needs to be serialized with each record over and over again. It is better to generate an Avro Pojo, as it won't need to ship the schema.

Upvotes: 1

Related Questions