Reputation: 1980
I'm sending byte array corresponding to Avro Record to kafka.
The producer:
props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker-address");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "Kafka Avro ClientOrderRequest Producer");
props.put("schema.registry.url", "schema-registry address");
KafkaProducer<String, ClientOrderRequest> producerRequest = new KafkaProducer<>(props);
while (true) {
ClientOrderRequest clientOrderRequest = createClientOrderRequest();
byte[] bytes = toByteArray(clientOrderRequest);
final ProducerRecord<String, byte[]> producerOrderRequest = new ProducerRecord<>("client-order-request",
"ClientOrderRequest-" + calendar.getTimeInMillis(), bytes);
System.out.println("Produced 1 record.");
The consumer:
props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker-address");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
props.put("schema.registry.url", "schema-registry address");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "events-group-3");
KafkaConsumer<String, ClientOrderRequest> clientOrderRequestConsumer = new KafkaConsumer<>(props);
while (true) {
ConsumerRecords<String, ClientOrderRequest> records = clientOrderRequestConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, ClientOrderRequest> record : records) {
String key = record.key();
ClientOrderRequest value = record.value();
The producer is able to send the byte array to the topic. But the Consumer isn't able to deserialize it. I have the error:
B cannot be cast to
A client-order-request-value subject is registered on the schema registry when I produce the message. I understand that I send a byte array and that the kafka/schema-registry expect an Avro record but I would have expect it would be able to deserialize it.
If it's not possible using the simple AvroKafkaSerializer should I implement my own serializer ?
Upvotes: 0
Views: 5234
Reputation: 191884
What is toByteArray
You don't need to manually serialize your data. That's the whole point of the serializer class. Unclear why you've changed this from your previous question
ClientOrderRequest clientOrderRequest = createClientOrderRequest();
final ProducerRecord<String, ClientOrderRequest> producerOrderRequest = new ProducerRecord<>("client-order-request",
"ClientOrderRequest-" + calendar.getTimeInMillis(), clientOrderRequest);
When you send a byte array with KafkaAvroSerializer.class, it sends an Avro object of {"type": "bytes"}
, not your custom record
If you really want to send byte arrays directly, you would use ByteArraySerializer
, but this would bypass the schema registry
Upvotes: 0