Omegaspard
Omegaspard

Reputation: 1980

Send byte array and derserialize it as Avro record with kafka

I'm sending byte array corresponding to Avro Record to kafka.

The producer:

props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker-address");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "Kafka Avro ClientOrderRequest Producer");
props.put("schema.registry.url", "schema-registry address");

KafkaProducer<String, ClientOrderRequest> producerRequest = new KafkaProducer<>(props);

        
while (true) {
    ClientOrderRequest clientOrderRequest = createClientOrderRequest();
    byte[] bytes = toByteArray(clientOrderRequest);
    final ProducerRecord<String, byte[]> producerOrderRequest = new ProducerRecord<>("client-order-request",
                "ClientOrderRequest-" + calendar.getTimeInMillis(), bytes);
    producerRequest.send(producerOrderRequest);
    producerRequest.flush();
    System.out.println("Produced 1 record.");
    Thread.sleep(2000);
}

The consumer:

props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker-address");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
props.put("schema.registry.url", "schema-registry address");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "events-group-3");

KafkaConsumer<String, ClientOrderRequest> clientOrderRequestConsumer = new KafkaConsumer<>(props);
        clientOrderRequestConsumer.subscribe(Collections.singletonList("client-order-request"));

while (true) {
    ConsumerRecords<String, ClientOrderRequest> records = clientOrderRequestConsumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, ClientOrderRequest> record : records) {
        String key = record.key();
        ClientOrderRequest value = record.value();
            System.out.println(key);
            System.out.println(value);
        }
    }
}

The producer is able to send the byte array to the topic. But the Consumer isn't able to deserialize it. I have the error:

B cannot be cast to com.swissquote.eforex.generated.avro.trading.ClientOrderRequest

A client-order-request-value subject is registered on the schema registry when I produce the message. I understand that I send a byte array and that the kafka/schema-registry expect an Avro record but I would have expect it would be able to deserialize it.

If it's not possible using the simple AvroKafkaSerializer should I implement my own serializer ?

Upvotes: 0

Views: 5234

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191884

What is toByteArray?

You don't need to manually serialize your data. That's the whole point of the serializer class. Unclear why you've changed this from your previous question

ClientOrderRequest clientOrderRequest = createClientOrderRequest();
final ProducerRecord<String, ClientOrderRequest> producerOrderRequest = new ProducerRecord<>("client-order-request",
                "ClientOrderRequest-" + calendar.getTimeInMillis(), clientOrderRequest);
producerRequest.send(producerOrderRequest);
producerRequest.flush();

When you send a byte array with KafkaAvroSerializer.class, it sends an Avro object of {"type": "bytes"}, not your custom record

If you really want to send byte arrays directly, you would use ByteArraySerializer, but this would bypass the schema registry

Upvotes: 0

Related Questions