Harmeet Singh Taara
Harmeet Singh Taara

Reputation: 6611

Kafka Avro Serializer: org.apache.avro.AvroRuntimeException: not open

I am using Apache Kafka with Avro Serializer, using specific format. I am trying to create my own custom class and used as a kafka message value. But when i am trying to send the message i am getting following exception:

Exception in thread "main" org.apache.avro.AvroRuntimeException: not open
    at org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
    at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:287)
    at com.harmeetsingh13.java.producers.avroserializer.AvroProducer.main(AvroProducer.java:57)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

My Avro Schema file as below:

{
    "namespace": "customer.avro",
    "type": "record",
    "name": "Customer",
    "fields": [{
        "name": "id",
        "type": "int"
    }, {
        "name": "name",
        "type": "string"
    }]
}

Customer Class:

public class Customer {
    public int id;
    public String name;

    public Customer() {
    }

    public Customer(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

Data Serialization using Avro:

public static void fireAndForget(ProducerRecord<String, DataFileWriter> record) {
        kafkaProducer.send(record);
    }

Customer customer1 = new Customer(1001, "James");

Parser parser = new Parser();
Schema schema = parser.parse(AvroProducer.class.getClassLoader().getResourceAsStream("customer.avro"));

SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema);
DataFileWriter<Customer> dataFileWriter = new DataFileWriter<>(writer);
dataFileWriter.append(customer1);
dataFileWriter.close();

ProducerRecord<String, DataFileWriter> record1 = new ProducerRecord<>("CustomerCountry",
        "Customer One", dataFileWriter
);
fireAndForget(record1);

I want to use SpecificDatumWriter writer instead of generic one. What is this error related with ?

Upvotes: 4

Views: 8507

Answers (1)

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149518

Kafka receives a key value pair to serialize, you're passing it a DataFileWriter which isn't the value you want to serialize, that's not going to work.

What you need to do is create a byte array with the serialized avro via a BinaryEncoder and ByteArrayOutputStream, and then pass it to the ProducerRecord<String, byte[]>:

SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema);
ByteArrayOutputStream os = new ByteArrayOutputStream();

try {
  BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null);
  writer.write(customer1, encoder);
  e.flush();

  byte[] avroBytes = os.toByteArray();
  ProducerRecord<String, byte[]> record1 = 
    new ProducerRecord<>("CustomerCountry", "Customer One", avroBytes); 

  kafkaProducer.send(record1);
} finally {
  os.close();
}

Upvotes: 1

Related Questions