Reputation: 3427
I try to implement a method for my customized class, producing data on Kafka using Flink Kafka connector. The class prototype is the following:
public class StreamData implements Serializable {
private transient StreamExecutionEnvironment env;
private DataStream<byte[]> data ;
...
The method for writing data to specific Kafka topic is like:
public void writeDataIntoESB(String id) throws Exception {
FlinkKafkaProducer011<byte[]> producer = new FlinkKafkaProducer011<byte[]>(
"localhost:9092",
id,
new KeyedSerializationSchema<byte[]>() {
@Override
public byte[] serializeKey(byte[] bytes) {
return bytes;
}
@Override
public byte[] serializeValue(byte[] bytes) {
return bytes;
}
@Override
public String getTargetTopic(byte[] bytes) {
return null;
}
});
data.addSink(producer);
}
I have another method for getting data from Kafka topic to data
filed of an object, that works fine. Now trying to get data from Kafka topic and write it to another Kafka topic I got the error:
org.apache.flink.api.common.InvalidProgramException: Object StreamData$2@1593948d is not serializable
The main code:
StreamData temp = new StreamData();
temp = temp.getDataFromESB("data", 0);
temp.writeDataIntoESB("flink_test");
It seems Java tries to serialize the object not just the field data
! The code for producing data to Kafka using Flink Kafka connector is tested and works in regular usage (I mean not using classes and write all code in main)
How can I disappear the error?
Upvotes: 1
Views: 2271
Reputation: 131
You can do Serialization in Flink like this also
dataStream.addSink(new FlinkKafkaProducer<KafkaObject>(ProducerTopic, new
CustomSerializerSchema(),properties));
public class CustomSerializerSchema implements SerializationSchema<MyUser> {
private static final long serialVersionUID = 1L;
@Override
public byte[] serialize(MyUser element) {
byte[] b = null;
try {
b= new ObjectMapper().writeValueAsBytes(element);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return b;
}
}
Upvotes: 0
Reputation: 4314
I believe the cause of the issue is that your code is doing this:
new KeyedSerializationSchema<byte[]>() {...}
What this code does is it creates an anonymous subclass of KeyedSerializationSchema as an inner class of the defining class (StreamData). Every inner class holds an implicit reference to the outer class instance, therefore serializing it using the default Java serialization rules will also transitively try to serialize the outer object (StreamData). The nicest ways of resolving this issue is to declare your subclass of KeyedSerializationSchema as either:
That last approach I think would look like this:
public class StreamData {
static KeyedSerializationSchema<byte[]> schema = new KeyedSerializationSchema<byte[]>() {
...
};
...
public void writeDataIntoESB(String id) throws Exception {
FlinkKafkaProducer011<byte[]> producer = new FlinkKafkaProducer011<byte[]>("localhost:9092", id, schema);
data.addSink(producer);
}
}
Upvotes: 4