Soheil Pourbafrani
Soheil Pourbafrani

Reputation: 3427

Flink Kafka producer: Object of class is not serializable

I try to implement a method for my customized class, producing data on Kafka using Flink Kafka connector. The class prototype is the following:

public class StreamData implements Serializable {
    private transient StreamExecutionEnvironment env;
    private DataStream<byte[]> data ;
    ...

The method for writing data to specific Kafka topic is like:

public void writeDataIntoESB(String id) throws Exception {

        FlinkKafkaProducer011<byte[]> producer = new FlinkKafkaProducer011<byte[]>(
                "localhost:9092",
                id,
                new KeyedSerializationSchema<byte[]>() {
                    @Override
                    public byte[] serializeKey(byte[] bytes) {
                        return bytes;
                    }

                    @Override
                    public byte[] serializeValue(byte[] bytes) {
                        return bytes;
                    }

                    @Override
                    public String getTargetTopic(byte[] bytes) {
                        return null;
                    }
                });               
        data.addSink(producer);
    }

I have another method for getting data from Kafka topic to data filed of an object, that works fine. Now trying to get data from Kafka topic and write it to another Kafka topic I got the error:

org.apache.flink.api.common.InvalidProgramException: Object StreamData$2@1593948d is not serializable

The main code:

StreamData temp = new StreamData();
temp = temp.getDataFromESB("data", 0);
temp.writeDataIntoESB("flink_test");

It seems Java tries to serialize the object not just the field data! The code for producing data to Kafka using Flink Kafka connector is tested and works in regular usage (I mean not using classes and write all code in main)

How can I disappear the error?

Upvotes: 1

Views: 2271

Answers (2)

Aarya
Aarya

Reputation: 131

You can do Serialization in Flink like this also

dataStream.addSink(new FlinkKafkaProducer<KafkaObject>(ProducerTopic, new 
                                             CustomSerializerSchema(),properties));

  public class CustomSerializerSchema implements SerializationSchema<MyUser> {

    private static final long serialVersionUID = 1L;

    @Override
    public byte[] serialize(MyUser element) {
        byte[] b = null;
         try {
             b= new ObjectMapper().writeValueAsBytes(element);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return b; 
    }
}

Upvotes: 0

Michal Borowiecki
Michal Borowiecki

Reputation: 4314

I believe the cause of the issue is that your code is doing this:

new KeyedSerializationSchema<byte[]>() {...}

What this code does is it creates an anonymous subclass of KeyedSerializationSchema as an inner class of the defining class (StreamData). Every inner class holds an implicit reference to the outer class instance, therefore serializing it using the default Java serialization rules will also transitively try to serialize the outer object (StreamData). The nicest ways of resolving this issue is to declare your subclass of KeyedSerializationSchema as either:

That last approach I think would look like this:

public class StreamData {
    static KeyedSerializationSchema<byte[]> schema = new KeyedSerializationSchema<byte[]>() {
        ...
    };
    ...
    public void writeDataIntoESB(String id) throws Exception {

        FlinkKafkaProducer011<byte[]> producer = new FlinkKafkaProducer011<byte[]>("localhost:9092", id, schema);               
        data.addSink(producer);
    }
}

Upvotes: 4

Related Questions