Reputation: 111
I am trying to deserialize kafka events in my flink stream job. This is my code:
...
case class URLResponse (status: Int, domain: String, url: String, queue: String, html: String)
...
val schema: Schema = AvroSchema[URLResponse]
...
val stream = env.addSource(new FlinkKafkaConsumer[GenericRecord](kafkaTopic, ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, schemaRegistryURL), properties))
And the job throwing this exception during the runtime:
...
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 26 more
Process finished with exit code 1
I read that I should not use Kryo
but I have no idea how to do it. I tried:
executionConfig.enableForceAvro()
executionConfig.disableForceKryo()
but it doesn't help.
Upvotes: 2
Views: 7442
Reputation: 119
I also came across same issue in java , Below code snippet help me to resolve issue
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
environment.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
Also you need to add maven dependency to resolve UnmodifiableCollectionsSerializer
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>0.45</version>
</dependency>
Upvotes: 3
Reputation: 121
I faced the same issue with Avro GenericRecord over Kinesis data stream. Using scala 2.12 and flink 1.11.4.
My solution was to add an implicit TypeInformation
implicit val typeInfo: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(avroSchema)
Below a full code example focusing on the serialisation problem:
@Test def `test avro generic record serializer`(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val schema: String =
"""
|{
| "namespace": "com.mberchon.monitor.dto.avro",
| "type": "record",
| "name": "TestAvro",
| "fields": [
| {"name": "strVal", "type": ["null", "string"]},
| {"name": "longVal", "type": ["null", "long"]}
| ]
|}
""".stripMargin
val avroSchema = new Schema.Parser().parse(schema)
val rec:GenericRecord = new GenericRecordBuilder(avroSchema)
.set("strVal","foo")
.set("longVal",1234L)
.build()
implicit val typeInfo: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(avroSchema)
val _ = env.fromElements(rec,rec).addSink(new PrintSinkFunction[GenericRecord]())
env.execute("Test serializer")
}
Coming back to your contexte, following code should work:
...
case class URLResponse (status: Int, domain: String, url: String, queue: String, html: String)
...
val schema: Schema = AvroSchema[URLResponse]
...
implicit val typeInfo: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(schema)
val stream = env.addSource(new FlinkKafkaConsumer[GenericRecord](kafkaTopic, ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, schemaRegistryURL), properties))
Upvotes: 3
Reputation: 2957
If you can't use use the Java environment to add a source (maybe you're using StreamExecutionEnvironment.readFile method) there is another solution shared here: https://stackoverflow.com/a/32453031/899937, essentially:
val unmodifiableCollectionClass = Class.forName("java.util.Collections$UnmodifiableCollection")
env.getConfig.addDefaultKryoSerializer(unmodifiableCollectionClass, classOf[UnmodifiableCollectionsSerializer])
kryo-serializers is not included in Flink anymore, so you have to add it as a dependency.
Upvotes: 3
Reputation: 111
The mentioned exception is related to the issue with the scala implementation of the avro deserialization. It works fine if I use the java implementation (https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro). My solution:
val javaStream = env.getJavaEnv.addSource(new FlinkKafkaConsumer[GenericRecord](
kafkaTopic, ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, schemaRegistryURL), properties),
new GenericRecordAvroTypeInfo(schema))
val stream = new DataStream[GenericRecord](javaStream)
Upvotes: 0