Reputation: 6611
I am using Apache Kafka with Avro Serializer, using specific format. I am trying to create my own custom class and used as a kafka message value. But when i am trying to send the message i am getting following exception:
Exception in thread "main" org.apache.avro.AvroRuntimeException: not open
at org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:287)
at com.harmeetsingh13.java.producers.avroserializer.AvroProducer.main(AvroProducer.java:57)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
My Avro Schema file as below:
{
"namespace": "customer.avro",
"type": "record",
"name": "Customer",
"fields": [{
"name": "id",
"type": "int"
}, {
"name": "name",
"type": "string"
}]
}
Customer Class:
public class Customer {
public int id;
public String name;
public Customer() {
}
public Customer(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
Data Serialization using Avro:
public static void fireAndForget(ProducerRecord<String, DataFileWriter> record) {
kafkaProducer.send(record);
}
Customer customer1 = new Customer(1001, "James");
Parser parser = new Parser();
Schema schema = parser.parse(AvroProducer.class.getClassLoader().getResourceAsStream("customer.avro"));
SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema);
DataFileWriter<Customer> dataFileWriter = new DataFileWriter<>(writer);
dataFileWriter.append(customer1);
dataFileWriter.close();
ProducerRecord<String, DataFileWriter> record1 = new ProducerRecord<>("CustomerCountry",
"Customer One", dataFileWriter
);
fireAndForget(record1);
I want to use SpecificDatumWriter
writer instead of generic one. What is this error related with ?
Upvotes: 4
Views: 8507
Reputation: 149518
Kafka receives a key value pair to serialize, you're passing it a DataFileWriter
which isn't the value you want to serialize, that's not going to work.
What you need to do is create a byte array with the serialized avro via a BinaryEncoder
and ByteArrayOutputStream
, and then pass it to the ProducerRecord<String, byte[]>
:
SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema);
ByteArrayOutputStream os = new ByteArrayOutputStream();
try {
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null);
writer.write(customer1, encoder);
e.flush();
byte[] avroBytes = os.toByteArray();
ProducerRecord<String, byte[]> record1 =
new ProducerRecord<>("CustomerCountry", "Customer One", avroBytes);
kafkaProducer.send(record1);
} finally {
os.close();
}
Upvotes: 1