Reputation: 682
I've read the documentation about receiving messages:
You can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation.
However I can't quite get it to work. I'm using Spring Boot 2.1.2 and have potentially oversalted the Spring Soup with Beans that do more harm than they benefit me, so I'd like to understand how it is supposed to work, so I can check where I strayed from the path of glory.
If I understand the documentation correctly, it could be sufficient to have a MessageListenerContainer
configured, e.g. this here:
@Configuration
public class MyKafkaConfiguration {
@Bean
public MessageListenerContainer myVeryOwnListener(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties cProps = new ContainerProperties(new TopicPartitionInitialOffset("spring-kafka-stackoverflow-questions", /* partition */ 0, /* Offset */ 0L));
KafkaMessageListenerContainer<String, String> result = new KafkaMessageListenerContainer<>(consumerFactory, cProps);
result.setupMessageListener(MessageListener<String, String) System.out::println);
return result;
}
}
This boots up without exception, but does not seem to actually listen to any messages on the broker.
From what I get from the usual flow with annotations, there needs to be someone who registers the listener in the form of an KafkaListenerEndpoint
to the KafkaListenerEndpointRegistry
.
This is automatically done by the KafkaListenerAnnotationBeanPostPorcessor
for all methods annotated by @KafkaListener
but how is this supposed to work in the case, where I want to take the path
by configuring a MessageListenerContainer and providing a message listener
instead of
using the @KafkaListener annotation
I don't quite get it. Also there is no method within the KafkaAutoConfiguration
(which Spring Boot provides) which, e.g. takes a List<MessageListenerContainer>
and automatically registeres all of them to the registry, so this is not surprising.
But how was it supposed to work in the first place, as the documentation suggests? Did I misunderstand that part? Can someone please enlight me?
Upvotes: 2
Views: 6853
Reputation: 174554
I just copied your bean into a new boot app and it works just fine.
The endpoint registry is only for @KafkaListener
containers, because they are not registered as beans in the application context (the registry is the bean).
@SpringBootApplication
public class So57628247Application {
private static final int MessageListener = 0;
public static void main(String[] args) {
SpringApplication.run(So57628247Application.class, args);
}
@Bean
public MessageListenerContainer myVeryOwnListener(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties cProps = new ContainerProperties(new TopicPartitionInitialOffset(
"spring-kafka-stackoverflow-questions", /* partition */ 0, /* Offset */ 0L));
KafkaMessageListenerContainer<String, String> result = new KafkaMessageListenerContainer<>(consumerFactory,
cProps);
result.setupMessageListener((MessageListener<String, String>) System.out::println);
return result;
}
@Bean
public NewTopic topic() {
return new NewTopic("spring-kafka-stackoverflow-questions", 1, (short) 1);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("spring-kafka-stackoverflow-questions", "foo");
};
}
}
and
ConsumerRecord(topic = spring-kafka-stackoverflow-questions, partition = 0, offset = 0, CreateTime = 1566573407373, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo)
Upvotes: 3
Reputation: 121272
The next section in that docs says:
When you use a message listener container, you must provide a listener to receive data. There are currently eight supported interfaces for message listeners. The following listing shows these interfaces
Then there is a section bellow Using KafkaMessageListenerContainer
:
To assign a
MessageListener
to a container, you can use theContainerProps.setMessageListener
method when creating the Container. The following example shows how to do so:ContainerProperties containerProps = new ContainerProperties("topic1", "topic2"); containerProps.setMessageListener(new MessageListener<Integer, String>() { ... }); DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(consumerProps()); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); return container;
And there is a full sample in the beginning of that Reference: https://docs.spring.io/spring-kafka/docs/2.2.8.RELEASE/reference/html/#a-very-very-quick-example
@Test
public void testAutoCommit() throws Exception {
logger.info("Start auto");
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
final CountDownLatch latch = new CountDownLatch(4);
containerProps.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
logger.info("received: " + message);
latch.countDown();
}
});
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
container.setBeanName("testAuto");
container.start();
Thread.sleep(1000); // wait a bit for the container to start
KafkaTemplate<Integer, String> template = createTemplate();
template.setDefaultTopic(topic1);
template.sendDefault(0, "foo");
template.sendDefault(2, "bar");
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.flush();
assertTrue(latch.await(60, TimeUnit.SECONDS));
container.stop();
logger.info("Stop auto");
}
Upvotes: 2