Swapneel Kulkarni
Swapneel Kulkarni

Reputation: 41

Consume String and JSON message from same kafka topic - issue with deserialization

JSON Messages are being consumed by the string consumer. My produces sends two types of messages Strings and Serialized JSON

@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.group-id}", containerFactory = "${kafka.string-listener-container-factory}")
public void consume(@NotNull ConsumerRecord<String, String> cr, @Payload String payload) {
    log.debug("Received asset id: {}, with key: {}, Partition: {}, Offset: {}  ", payload, cr.key(), cr.partition(),
            cr.offset());

}


@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.group-id}", containerFactory = "${kafka.json-listener-container-factory}")
public void consumeAssetEvent(@NotNull ConsumerRecord<String, Event> cr, @Payload Event payload) {
    log.debug("Received asset id: {}, with key: {}, Partition: {}, Offset: {}  ", payload, cr.key(), cr.partition(),
            cr.offset());

}

Consumer Side I have two consumer 1. listening for string message 2. listening for json and deserialize to object

Even the json messages are consumed by String listener.

Upvotes: 1

Views: 1772

Answers (1)

Vinit Pillai
Vinit Pillai

Reputation: 516

There is no filter functionality in consumers which distinguish the messages according to the Serde you have set(eg string , json etc). When a producer sends a message it is converted to byte[] in kafka topic. This byte[] is then deserialized by consumner deserialization setting.

So there is no default way to filter string messages to the string consumer and json to json consumer. Either create listener which receives all data and check if it is json or not or change the topic for string and json (send string to one topic and json to another and consume accordingly).

Upvotes: 2

Related Questions