Reputation: 51
FLINK Streaming: I have DataStream[String] from kafkaconsumer which is JSON
stream = env
.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
I have to sink this stream using StreamingFileSink, which needs DataStream[GenericRecord]
val schema: Schema = ...
val input: DataStream[GenericRecord] = ...
val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
.forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
.build()
input.addSink(sink)
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html
Question: How to convert DataStream[String] to DataStream[GenericRecord] before Sinking so that I can write AVRO files ?
Exception while converting String stream to generic data strem
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
at org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:791)
at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1168)
at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:617)
at com.att.vdcs.StreamingJobKafkaFlink$.main(StreamingJobKafkaFlink.scala:128)
at com.att.vdcs.StreamingJobKafkaFlink.main(StreamingJobKafkaFlink.scala)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406)
... 7 more
After initializing schema in mapper, Getting cast exception.
org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: scala.Tuple2 cannot be cast to java.util.Map
schema and msg below in screen:
Got through CAST Exception by casting like:
record.put(0,scala.collection.JavaConverters.mapAsJavaMapConverter(msg._1).asJava)
Now streaming is working good Except there are extra Escape Characters added
,"body":"\"{\\\"hdr\\\":{\\\"mes
there are extra escape \
it should be like:
,"body":"\"{\"hdr\":{\"mes
extra escape was removed after changing toString to getAsString
Now its working as expected.
Need to try SNAPPY compression of stream next.
Upvotes: 0
Views: 2515
Reputation: 1
In java, you should use a RichMapFunction to convert DataStream to DataStream and add a transient Schema field to generate GenericRecord. But i dont know how to do this in scala, just for reference.
DataStream<GenericRecord> records = maps.map(new RichMapFunction<Map<String, Object>, GenericRecord>() {
private transient DatumWriter<IndexedRecord> datumWriter;
/**
* Output stream to serialize records into byte array.
*/
private transient ByteArrayOutputStream arrayOutputStream;
/**
* Low-level class for serialization of Avro values.
*/
private transient Encoder encoder;
/**
* Avro serialization schema.
*/
private transient Schema schema;
@Override
public GenericRecord map(Map<String, Object> stringObjectMap) throws Exception {
GenericRecord record = new GenericData.Record(schema);
stringObjectMap.entrySet().forEach(entry->{record.put(entry.getKey(), entry.getValue());});
return record;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.arrayOutputStream = new ByteArrayOutputStream();
this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
this.datumWriter = new GenericDatumWriter<>(schema);
try {
this.schema = new Schema.Parser().parse(avroSchemaString);
} catch (SchemaParseException e) {
throw new IllegalArgumentException("Could not parse Avro schema string.", e);
}
}
});
final StreamingFileSink<GenericRecord> sink = StreamingFileSink
.forBulkFormat(new Path("D:\\test"), AvroWriters.forGenericRecord(mongoSchema))
.build();
records.addSink(sink);
Upvotes: 0
Reputation: 4542
You need to transform your stream of Strings into a stream of GenericRecords, for example using a .map()
function.
Example:
DataStream<String> strings = env.addSource( ... );
DataStream<GenericRecord> records = strings.map(inputStr -> {
GenericData.Record rec = new GenericData.Record(schema);
rec.put(0, inputStr);
return rec;
});
Please note that using GenericRecord
can lead to a poor performance, because the schema needs to be serialized with each record over and over again.
It is better to generate an Avro Pojo, as it won't need to ship the schema.
Upvotes: 1