Reputation: 1727
I am using spring-kafka 2.2.8 and setting concurrency to 2 as shown below and trying to understand how do i pause an consumer thread/instance when particular condition is met.
@KafkaListener(id = "myConsumerId", topics = "myTopic", concurrency=2)
public void listen(String in) {
System.out.println(in);
}
Now, I've two questions.
Would my consumer span two different poll threads to poll the records?
If i'm setting an id to the consumer as shown above. How can i pause a specific consumer thread (with concurrency set to more than 1).
Please suggest.
Upvotes: 0
Views: 664
Reputation: 174514
Use the KafkaListenerEndpointRegistry.getListenerContainer(id)
method to get a reference to the container.
Cast it to a ConcurrentMessageListenerContainer
and call getContainers()
to get a list of the child KafkaMessageListenerContainer
s; you can then pause/resume them individually.
You can determine which topics/partitions each one has using getAssignedPartitions()
.
Upvotes: 1