Reputation: 196
I am trying to write to Confluent kafka with schema registry from Flink using FlinkKafkaProducer10. Below error is produced. I created custom schema serializer see ConfluentAvroSerializationSchema class. Code compiles but produces runtime error. Complete code example that produces the error is on https://github.com/dmiljkovic/test-flink-schema-registry. All resources are mocked, kafka cluster and schema registry. Code is actually test case.
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
public class ConfluentAvroSerializationSchema<T> implements SerializationSchema<T> {
private static final long serialVersionUID = 1L;
private final String topic;
private final KafkaAvroSerializer avroSerializer;
public ConfluentAvroSerializationSchema(String topic, KafkaAvroSerializer avroSerializer) {
this.topic =topic;
this.avroSerializer = avroSerializer;
}
@Override
public byte[] serialize(T obj) {
return avroSerializer.serialize(topic, obj);
}
}
//serialize avro
KafkaAvroSerializer avroSerializer = new KafkaAvroSerializer(schemaRegistry);
avroSerializer.configure(new HashMap(registryProp), false);
ConfluentAvroSerializationSchema ser =
new ConfluentAvroSerializationSchema<TestRecordAvro>(topic, avroSerializer);
//write to kafka
FlinkKafkaProducer010.writeToKafkaWithTimestamps(ds, topic, ser, flinkProp);
org.apache.flink.api.common.InvalidProgramException: Object org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@6e28bb87 is not serializable
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:109)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.<init>(FlinkKafkaProducerBase.java:145)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09.<init>(FlinkKafkaProducer09.java:130)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.<init>(FlinkKafkaProducer010.java:227)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.writeToKafkaWithTimestamps(FlinkKafkaProducer010.java:137)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.writeToKafkaWithTimestamps(FlinkKafkaProducer010.java:115)
at com.acme.kafka_avro_util.TestProducer.testAvroConsumer(TestProducer.java:59)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.io.NotSerializableException: io.confluent.kafka.serializers.KafkaAvroSerializer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:107)
Upvotes: 4
Views: 4757
Reputation: 134
private final KafkaAvroSerializer avroSerializer;
to
private final transient KafkaAvroSerializer avroSerializer;
then initialize it in a setup
method. Sort of:
@Setup
public void start() {
this.serializer = //initialization
}
Upvotes: 3
Reputation: 2921
Flink has to serialize all operators (incl. your sink) to send it to the task-manager.
The problem is that the KafkaAvroSerializer
which you are using in your ConfluentAvroSerializationSchema
class is not serializable at all. That makes your sink unserializable. You could initialize the KafkaAvroSerializer
lazy (e.g. pass the properties for the schema-registry instead of the instance of KafkaAvroSerializer
to ConfluentAvroSerializationSchema
and create the KafkaAvroSerializer
on the first call of serialize
).
Upvotes: 3