Reputation: 8353
I am trying to develop a Spring Boot application using the library reactor-kafka
to react to some messages read from a Kafka topic.
I have a configuration class that builds a KafkaReceiver
.
@Configuration
public class MyConfiguration {
@Bean
public KafkaReceiver<String, String> kafkaReceiver() {
Map<String, Object> props = new HashMap<>();
// Options initialisation...
final ReceiverOptions<String, String> receiverOptions =
ReceiverOptions.<String, string>create(props)
.subscription(Collections.singleton(consumer.getTopic()));
return KafkaReceiver.create(receiverOptions);
}
}
Well...and now? Using not-so-reactive spring-kafka
library, I can annotate a method with @KafkaListener
and Spring Boot will create for me a thread listening from a Kafka topic.
Where should I place the KafkaReceiver
, instead? In all the examples I found the use directly the main
method, but this is not the Boot way.
I am using Spring Boot 2.1.3 and Reactor-Kafka 1.1.0
Thanks in advance.
Upvotes: 9
Views: 2817
Reputation: 121177
Since you have that KafkaReceiver
bean, now you can do like this:
@Bean
public ApplicationRunner runner(KafkaReceiver<String, String> kafkaReceiver) {
return args -> {
kafkaReceiver.receive()
...
.sunbscribe();
};
}
This ApplicationRunner
bean is going to be kicked when the ApplicationContext
is ready. See its JavaDocs for more info.
Upvotes: 7