Reputation: 1565
i have a java spring boot kafka consumer (spring-kafka). What it does is consume from a topic. My topic has 3 partitions and below is the code for the consumer.
@KafkaListener(topics = "myTopic", groupId = "myGroup", autoStartup = "true", concurrency = "3")
public void consume(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header("custom_header_1") String customHeader1,
@Header("custom_header_2") String customHeader2,
@Header("custom_header_3") String customHeader3,
@Header(required = false, name = KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Payload(required = false) String message) {
log.info("-------------------------");
log.info(key);
log.info(message);
log.info("-------------------------");
}
In here i have added concurrency as 3 since my topic has 3 partitions. I several messages with different keys. Actually my key is an id of an entity in my application.
According to my understanding when my consumer starts consuming it will have 3 threads as separate consumers rather than single consumer consuming all the messages (Because i have used the concurrency as 3).
So what i want is which consumer consumer from which partition. Is there a way that i can see the from 3 consumers, which consumer consumed which message ? As an example imagine my partitions are myTopic0, myTopic1 and myTopic2 and i have consumer0, consumer1 and consumer2. Is there a way that i can see messageX consumed from partition myTopic1 by consumer1 programmatically.
Upvotes: 0
Views: 1793
Reputation: 174564
Thread.currentThread().getName()
will contain the thread name which is the listener id followed by -n
where n is 0, 1, 2. Give the listener an id
instead of letting the framework generate one.
(Your log messages will also include it if you have %t
in your log pattern).
Upvotes: 1