Sumit Sood
Sumit Sood

Reputation: 485

Spring Kafka: Close the container and read the messages from specific offset with ConcurrentKafkaListenerContainerFactory

In my spring kafka application, I want to trigger the consumer at run time according to input of some scheduler. Scheduler will tell the listener from which topic it can start consuming messages. There is springboot application with custom ConcurrentKafkaListenerContainerFactory class. I need to perform three tasks:

  1. close the container, After successfully reading all the messages available on topic.
  2. It will store the current offset in DB or file system.
  3. Next time when consumer up again, the stored offset can be used to process the records instead of default offset managed by Kafka. So that in future we can change the offset value in DB and get get desired reports. I know how to handle all these with @KafkaListener but not sure how to hook with ConcurrentKafkaListenerContainerFactory. The current code is listed below:
@SpringBootApplication
public class KafkaApp{


    public static void main(String[] args) {
        SpringApplication.run(KafkaApp.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("testTopic").partitions(1).replicas(1).build();
    }

     }

     @Component
     class Listener {

    private static final Logger log = LoggerFactory.getLogger(Listener.class);

    private static final Method otherListen;

    static {
        try {
            otherListen = Listener.class.getDeclaredMethod("otherListen", List.class);
        }
        catch (NoSuchMethodException | SecurityException ex) {
            throw new IllegalStateException(ex);
        }
    }

    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;

    private final MessageHandlerMethodFactory methodFactory;

    private final KafkaAdmin admin;

    private final KafkaTemplate<String, String> template;

    public Listener(ConcurrentKafkaListenerContainerFactory<String, String> factory, KafkaAdmin admin,
            KafkaTemplate<String, String> template, KafkaListenerAnnotationBeanPostProcessor<?, ?> bpp) {

        this.factory = factory;
        this.admin = admin;
        this.template = template;
        this.methodFactory = bpp.getMessageHandlerMethodFactory();
    }

    @KafkaListener(id = "myId", topics = "testTopic")
    public void listen(String topicName) {
        try (AdminClient client = AdminClient.create(this.admin.getConfigurationProperties())) {
            NewTopic topic = TopicBuilder.name(topicName).build();
            client.createTopics(List.of(topic)).all().get(10, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            log.error("Failed to create topic", e);
        }
        ConcurrentMessageListenerContainer<String, String> container =
                this.factory.createContainer(new TopicPartitionOffset(topicName, 0));
        BatchMessagingMessageListenerAdapter<String, String> adapter =
                new BatchMessagingMessageListenerAdapter<>(this, otherListen);
        adapter.setHandlerMethod(new HandlerAdapter(
                this.methodFactory.createInvocableHandlerMethod(this, otherListen)));
        FilteringBatchMessageListenerAdapter<String, String> filtered =
                new FilteringBatchMessageListenerAdapter<>(adapter, record -> !record.key().equals("foo"));
        container.getContainerProperties().setMessageListener(filtered);
        container.getContainerProperties().setGroupId("group.for." + topicName);
        container.setBeanName(topicName + ".container");
        container.start();
        IntStream.range(0, 10).forEach(i -> this.template.send(topicName, 0, i % 2 == 0 ? "foo" : "bar", "test" + i));
    }

    void otherListen(List<String> others) {
        log.info("Others: {}", others);
    }

}

EDIT

@SpringBootApplication
public class KafkaApp{


    public static void main(String[] args) {
        SpringApplication.run(KafkaApp.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("testTopic").partitions(1).replicas(1).build();
    }

     }

     @Component
     class Listener {

    private static final Logger log = LoggerFactory.getLogger(Listener.class);

    private static final Method otherListen;

    static {
        try {
            otherListen = Listener.class.getDeclaredMethod("otherListen", List.class);
        }
        catch (NoSuchMethodException | SecurityException ex) {
            throw new IllegalStateException(ex);
        }
    }

    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;

    private final MessageHandlerMethodFactory methodFactory;

    private final KafkaAdmin admin;

    private final KafkaTemplate<String, String> template;

    public Listener(ConcurrentKafkaListenerContainerFactory<String, String> factory, KafkaAdmin admin,
            KafkaTemplate<String, String> template, KafkaListenerAnnotationBeanPostProcessor<?, ?> bpp) {

        this.factory = factory;
        this.admin = admin;
        this.template = template;
        this.methodFactory = bpp.getMessageHandlerMethodFactory();
    }

    @KafkaListener(id = "myId", topics = "testTopic")
    public void listen(String topicName) {
        try (AdminClient client = AdminClient.create(this.admin.getConfigurationProperties())) {
            NewTopic topic = TopicBuilder.name(topicName).build();
            client.createTopics(List.of(topic)).all().get(10, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            log.error("Failed to create topic", e);
        }
        ConcurrentMessageListenerContainer<String, String> container =
                this.factory.createContainer(new TopicPartitionOffset(topicName, 0));
        BatchMessagingMessageListenerAdapter<String, String> adapter =
                new BatchMessagingMessageListenerAdapter<>(this, otherListen);
        adapter.setHandlerMethod(new HandlerAdapter(
                this.methodFactory.createInvocableHandlerMethod(this, otherListen)));
        FilteringBatchMessageListenerAdapter<String, String> filtered =
                new FilteringBatchMessageListenerAdapter<>(adapter, record -> !record.key().equals("foo"));
        container.getContainerProperties().setMessageListener(filtered);
        container.getContainerProperties().setGroupId("group.for." + topicName);
        container.setBeanName(topicName + ".container");
        container.getContainerProperties().setIdleEventInterval(3000L); 
        container.start();
        IntStream.range(0, 10).forEach(i -> this.template.send(topicName, 0, i % 2 == 0 ? "foo" : "bar", "test" + i));
    }

    void otherListen(List<String> others) {
        log.info("Others: {}", others);
    }
     @EventListener
    public void eventHandler(ListenerContainerIdleEvent event) {
        logger.info("No messages received for " + event.getIdleTime() + " milliseconds");
    }


}

Upvotes: 0

Views: 2107

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

You can receive ListenerContainerIdleEvents when there are no messages left to process; you can use this event to stop the container; you should perform the stop() on a different thread (not the one that publishes the event).

See How to check if Kafka is empty using Spring Kafka?

You can get the partition/offset in several ways.

void otherListen<List<ConsumerRecord<..., ...>>) 

or

void otherListen(List<String> others,
    @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
    @Header(KafkaHeaders.OFFSET) List<Long> offsets)

You can specify the starting offset in the

new TopicPartitionOffset(topicName, 0), startOffset);

when creating the container.

EDIT

To stop the container when it is idle, set the idleEventInterval and add an @EventListener method and stop the container.

TaskExecutor exec = new SimpleAsyncTaskExecutor();

@EventListener
void idle(ListenerContainerIdleEvent event) {
    log...
    this.exec.execute(() -> event.getContainer(ConcurrentMessageListenerContainer.class).stop());
}

If you add concurrency to your containers, you would need for each child container to go idle before stopping the parent container.

EDIT2

I just added it to the code I wrote for the answer to your other question and it works exactly as expected.

    @KafkaListener(id = "so69134055", topics = "so69134055")
    public void listen(String topicName) {
        try (AdminClient client = AdminClient.create(this.admin.getConfigurationProperties())) {
            NewTopic topic = TopicBuilder.name(topicName).build();
            client.createTopics(List.of(topic)).all().get(10, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            log.error("Failed to create topic", e);
        }
        ConcurrentMessageListenerContainer<String, String> container =
                this.factory.createContainer(new TopicPartitionOffset(topicName, 0));
        BatchMessagingMessageListenerAdapter<String, String> adapter =
                new BatchMessagingMessageListenerAdapter<>(this, otherListen);
        adapter.setHandlerMethod(new HandlerAdapter(
                this.methodFactory.createInvocableHandlerMethod(this, otherListen)));
        FilteringBatchMessageListenerAdapter<String, String> filtered =
                new FilteringBatchMessageListenerAdapter<>(adapter, record -> !record.key().equals("foo"));
        container.getContainerProperties().setMessageListener(filtered);
        container.getContainerProperties().setGroupId("group.for." + topicName);
        container.getContainerProperties().setIdleEventInterval(3000L);
        container.setBeanName(topicName + ".container");
        container.start();
        IntStream.range(0, 10).forEach(i -> this.template.send(topicName, 0, i % 2 == 0 ? "foo" : "bar", "test" + i));
    }

    void otherListen(List<String> others) {
        log.info("Others: {}", others);
    }

    TaskExecutor exec = new SimpleAsyncTaskExecutor();

    @EventListener
    public void idle(ListenerContainerIdleEvent event) {
        log.info(event.toString());
        this.exec.execute(() -> {
            ConcurrentMessageListenerContainer container = event.getContainer(ConcurrentMessageListenerContainer.class);
            log.info("stopping container: " + container.getBeanName());
            container.stop();
        });
    }
[foo.container-0-C-1] Others: [test0, test2, test4, test6, test8]
[foo.container-0-C-1] ListenerContainerIdleEvent [idleTime=5.007s, listenerId=foo.container-0, container=KafkaMessageListenerContainer [id=foo.container-0, clientIndex=-0, topicPartitions=[foo-0]], paused=false, topicPartitions=[foo-0]]
[SimpleAsyncTaskExecutor-1] stopping container: foo.container
[foo.container-0-C-1] [Consumer clientId=consumer-group.for.foo-2, groupId=group.for.foo] Unsubscribed all topics or patterns and assigned partitions
[foo.container-0-C-1] Metrics scheduler closed
[foo.container-0-C-1] Closing reporter org.apache.kafka.common.metrics.JmxReporter
[foo.container-0-C-1] Metrics reporters closed
[foo.container-0-C-1] App info kafka.consumer for consumer-group.for.foo-2 unregistered
[foo.container-0-C-1] group.for.foo: Consumer stopped

Upvotes: 2

Related Questions