Reputation: 147
As native KafkaConsumer is not thread safe, so it is discouraged to call pause and resume methods from different thread instead of kafka-consumer processing thread. but as spring-kafka provides another layer KafkaMessageListenerContainer which internally use kafka-consumer. So my question is can we use KafkaListenerEndpointRegistry to get the listener container by id and call resume or pause method from other thread rather than consumer processing thread.
kafkaListenerEndpointRegistry.getListenerContainer("id").pause();
ExecutorService executorService = newFixedThreadPool(2);
executorService.submit(()->{
System.out.println("CurrentThread: {}" + Thread.currentThread().getId()+ " " + Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
kafkaListenerEndpointRegistry.getListenerContainer("id").resume();
});
Upvotes: 1
Views: 1367
Reputation: 174554
Yes; container.pause()
sets a flag to tell the Consumer
thread to pause before its next poll()
call. Similarly, resume()
resets the flag so the consumer thread will resume the Consumer
before the next poll.
Upvotes: 2