SBhogal
SBhogal

Reputation: 147

Spring Kafka MessageListenerContainer Resume/Pause # spring-kafka

As native KafkaConsumer is not thread safe, so it is discouraged to call pause and resume methods from different thread instead of kafka-consumer processing thread. but as spring-kafka provides another layer KafkaMessageListenerContainer which internally use kafka-consumer. So my question is can we use KafkaListenerEndpointRegistry to get the listener container by id and call resume or pause method from other thread rather than consumer processing thread.

kafkaListenerEndpointRegistry.getListenerContainer("id").pause();
       

    ExecutorService executorService  = newFixedThreadPool(2);
    executorService.submit(()->{

        System.out.println("CurrentThread: {}" + Thread.currentThread().getId()+ " " + Thread.currentThread().getName());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        kafkaListenerEndpointRegistry.getListenerContainer("id").resume();
    });

Upvotes: 1

Views: 1367

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

Yes; container.pause() sets a flag to tell the Consumer thread to pause before its next poll() call. Similarly, resume() resets the flag so the consumer thread will resume the Consumer before the next poll.

Upvotes: 2

Related Questions