Reputation: 1980
I'm sending byte array corresponding to Avro Record to kafka.
The producer:
props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker-address");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "Kafka Avro ClientOrderRequest Producer");
props.put("schema.registry.url", "schema-registry address");
KafkaProducer<String, ClientOrderRequest> producerRequest = new KafkaProducer<>(props);
while (true) {
ClientOrderRequest clientOrderRequest = createClientOrderRequest();
byte[] bytes = toByteArray(clientOrderRequest);
final ProducerRecord<String, byte[]> producerOrderRequest = new ProducerRecord<>("client-order-request",
"ClientOrderRequest-" + calendar.getTimeInMillis(), bytes);
producerRequest.send(producerOrderRequest);
producerRequest.flush();
System.out.println("Produced 1 record.");
Thread.sleep(2000);
}
The consumer:
props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker-address");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
props.put("schema.registry.url", "schema-registry address");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "events-group-3");
KafkaConsumer<String, ClientOrderRequest> clientOrderRequestConsumer = new KafkaConsumer<>(props);
clientOrderRequestConsumer.subscribe(Collections.singletonList("client-order-request"));
while (true) {
ConsumerRecords<String, ClientOrderRequest> records = clientOrderRequestConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, ClientOrderRequest> record : records) {
String key = record.key();
ClientOrderRequest value = record.value();
System.out.println(key);
System.out.println(value);
}
}
}
The producer is able to send the byte array to the topic. But the Consumer isn't able to deserialize it. I have the error:
B cannot be cast to com.swissquote.eforex.generated.avro.trading.ClientOrderRequest
A client-order-request-value subject is registered on the schema registry when I produce the message. I understand that I send a byte array and that the kafka/schema-registry expect an Avro record but I would have expect it would be able to deserialize it.
If it's not possible using the simple AvroKafkaSerializer should I implement my own serializer ?
Upvotes: 0
Views: 5234
Reputation: 191884
What is toByteArray
?
You don't need to manually serialize your data. That's the whole point of the serializer class. Unclear why you've changed this from your previous question
ClientOrderRequest clientOrderRequest = createClientOrderRequest();
final ProducerRecord<String, ClientOrderRequest> producerOrderRequest = new ProducerRecord<>("client-order-request",
"ClientOrderRequest-" + calendar.getTimeInMillis(), clientOrderRequest);
producerRequest.send(producerOrderRequest);
producerRequest.flush();
When you send a byte array with KafkaAvroSerializer.class, it sends an Avro object of {"type": "bytes"}
, not your custom record
If you really want to send byte arrays directly, you would use ByteArraySerializer
, but this would bypass the schema registry
Upvotes: 0