Raj
Raj

Reputation: 1727

How to pause a specific kafka consumer thread when concurrency is set to more than 1?

I am using spring-kafka 2.2.8 and setting concurrency to 2 as shown below and trying to understand how do i pause an consumer thread/instance when particular condition is met.

@KafkaListener(id = "myConsumerId", topics = "myTopic", concurrency=2)
    public void listen(String in) {
        System.out.println(in);
    }

Now, I've two questions.

  1. Would my consumer span two different poll threads to poll the records?

  2. If i'm setting an id to the consumer as shown above. How can i pause a specific consumer thread (with concurrency set to more than 1).

Please suggest.

Upvotes: 0

Views: 664

Answers (1)

Gary Russell
Gary Russell

Reputation: 174514

Use the KafkaListenerEndpointRegistry.getListenerContainer(id) method to get a reference to the container.

Cast it to a ConcurrentMessageListenerContainer and call getContainers() to get a list of the child KafkaMessageListenerContainers; you can then pause/resume them individually.

You can determine which topics/partitions each one has using getAssignedPartitions().

Upvotes: 1

Related Questions