Reputation: 41
JSON Messages are being consumed by the string consumer. My produces sends two types of messages Strings and Serialized JSON
@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.group-id}", containerFactory = "${kafka.string-listener-container-factory}")
public void consume(@NotNull ConsumerRecord<String, String> cr, @Payload String payload) {
log.debug("Received asset id: {}, with key: {}, Partition: {}, Offset: {} ", payload, cr.key(), cr.partition(),
cr.offset());
}
@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.group-id}", containerFactory = "${kafka.json-listener-container-factory}")
public void consumeAssetEvent(@NotNull ConsumerRecord<String, Event> cr, @Payload Event payload) {
log.debug("Received asset id: {}, with key: {}, Partition: {}, Offset: {} ", payload, cr.key(), cr.partition(),
cr.offset());
}
Consumer Side I have two consumer 1. listening for string message 2. listening for json and deserialize to object
Even the json messages are consumed by String listener.
Upvotes: 1
Views: 1772
Reputation: 516
There is no filter functionality in consumers which distinguish the messages according to the Serde you have set(eg string , json etc). When a producer sends a message it is converted to byte[] in kafka topic. This byte[] is then deserialized by consumner deserialization setting.
So there is no default way to filter string messages to the string consumer and json to json consumer. Either create listener which receives all data and check if it is json or not or change the topic for string and json (send string to one topic and json to another and consume accordingly).
Upvotes: 2