tilois
tilois

Reputation: 682

Configure listener programmatically instead of using annotations

I've read the documentation about receiving messages:

You can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation.

However I can't quite get it to work. I'm using Spring Boot 2.1.2 and have potentially oversalted the Spring Soup with Beans that do more harm than they benefit me, so I'd like to understand how it is supposed to work, so I can check where I strayed from the path of glory.

If I understand the documentation correctly, it could be sufficient to have a MessageListenerContainer configured, e.g. this here:

@Configuration
public class MyKafkaConfiguration {
  @Bean
  public MessageListenerContainer myVeryOwnListener(ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties cProps = new ContainerProperties(new TopicPartitionInitialOffset("spring-kafka-stackoverflow-questions", /* partition */ 0, /* Offset */ 0L));
    KafkaMessageListenerContainer<String, String> result = new KafkaMessageListenerContainer<>(consumerFactory, cProps);
    result.setupMessageListener(MessageListener<String, String) System.out::println);
    return result;
  }
}

This boots up without exception, but does not seem to actually listen to any messages on the broker. From what I get from the usual flow with annotations, there needs to be someone who registers the listener in the form of an KafkaListenerEndpoint to the KafkaListenerEndpointRegistry.

This is automatically done by the KafkaListenerAnnotationBeanPostPorcessor for all methods annotated by @KafkaListener but how is this supposed to work in the case, where I want to take the path

by configuring a MessageListenerContainer and providing a message listener

instead of

using the @KafkaListener annotation

I don't quite get it. Also there is no method within the KafkaAutoConfiguration (which Spring Boot provides) which, e.g. takes a List<MessageListenerContainer> and automatically registeres all of them to the registry, so this is not surprising.

But how was it supposed to work in the first place, as the documentation suggests? Did I misunderstand that part? Can someone please enlight me?

Upvotes: 2

Views: 6853

Answers (2)

Gary Russell
Gary Russell

Reputation: 174554

I just copied your bean into a new boot app and it works just fine.

The endpoint registry is only for @KafkaListener containers, because they are not registered as beans in the application context (the registry is the bean).


@SpringBootApplication
public class So57628247Application {

    private static final int MessageListener = 0;

    public static void main(String[] args) {
        SpringApplication.run(So57628247Application.class, args);
    }

    @Bean
    public MessageListenerContainer myVeryOwnListener(ConsumerFactory<String, String> consumerFactory) {
        ContainerProperties cProps = new ContainerProperties(new TopicPartitionInitialOffset(
                "spring-kafka-stackoverflow-questions", /* partition */ 0, /* Offset */ 0L));
        KafkaMessageListenerContainer<String, String> result = new KafkaMessageListenerContainer<>(consumerFactory,
                cProps);
        result.setupMessageListener((MessageListener<String, String>) System.out::println);
        return result;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("spring-kafka-stackoverflow-questions", 1, (short) 1);
    }


    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("spring-kafka-stackoverflow-questions", "foo");
        };
    }

}

and

ConsumerRecord(topic = spring-kafka-stackoverflow-questions, partition = 0, offset = 0, CreateTime = 1566573407373, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo)

Upvotes: 3

Artem Bilan
Artem Bilan

Reputation: 121272

The next section in that docs says:

When you use a message listener container, you must provide a listener to receive data. There are currently eight supported interfaces for message listeners. The following listing shows these interfaces

Then there is a section bellow Using KafkaMessageListenerContainer:

To assign a MessageListener to a container, you can use the ContainerProps.setMessageListener method when creating the Container. The following example shows how to do so:

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<Integer, String>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

And there is a full sample in the beginning of that Reference: https://docs.spring.io/spring-kafka/docs/2.2.8.RELEASE/reference/html/#a-very-very-quick-example

@Test
public void testAutoCommit() throws Exception {
    logger.info("Start auto");
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    final CountDownLatch latch = new CountDownLatch(4);
    containerProps.setMessageListener(new MessageListener<Integer, String>() {

        @Override
        public void onMessage(ConsumerRecord<Integer, String> message) {
            logger.info("received: " + message);
            latch.countDown();
        }

    });
    KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
    container.setBeanName("testAuto");
    container.start();
    Thread.sleep(1000); // wait a bit for the container to start
    KafkaTemplate<Integer, String> template = createTemplate();
    template.setDefaultTopic(topic1);
    template.sendDefault(0, "foo");
    template.sendDefault(2, "bar");
    template.sendDefault(0, "baz");
    template.sendDefault(2, "qux");
    template.flush();
    assertTrue(latch.await(60, TimeUnit.SECONDS));
    container.stop();
    logger.info("Stop auto");

}

Upvotes: 2

Related Questions