Reputation: 3874
I'm writing a small PoC using Spring and Spring Kafka. My goal is to have both a producer and a consumer, writing to (resp. reading from) from this topic.
I'm having a weird situation where:
Below is my code - it is very similar to the examples of the documentation. More precisely, the problem is coming from the fact that the beans in KafkaConsumerConfiguration are not being created by Spring (i.e. the methods constructing them are never called).
KafkaProducerConfiguration.java
@Configuration
public class KafkaProducerConfiguration {
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:32768");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
}
MessageSender.java
@Component
public class MessageSender {
final static private Logger log = Logger.getLogger(MessageSender.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostConstruct
public void onConstruct() throws InterruptedException {
log.info("Sending messages...");
for (int i = 0; i < 100; ++i) {
kafkaTemplate.send("mytopic", "this is a message");
Thread.sleep(1000);
}
kafkaTemplate.flush(); // NOTE: no changes if I move this call in the loop
log.info("Done sending messages");
}
}
KafkaConsumerConfiguration.java
@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:32768");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-service");
return props;
}
}
MyMessageListener.java
@Service
public class MyMessageListener {
final static private Logger log = Logger.getLogger(MyMessageListener.class);
@PostConstruct
public void onConstruct() {
log.info("Message listener started");
}
@KafkaListener(topics = "mytopic")
public void onMessageReceived(String message) {
log.info("Got message: "+ message);
}
}
Here's the log generated by the application, for reference: https://pastebin.com/BY783jiL. As you can see, the consumer beans are not being created (otherwise there would be a block ConsumerConfig values: ...
.
Here are a few things I tried without success:
containerFactory = "myBeanName"
on the MyMessageListener.onMessageReceived
method)KafkaConsumerConfiguration
to something else@Bean
in my KafkaConsumerConfiguration
to see if it would get created (it does)Versions: Spring Boot 1.5.9, Spring-Kafka: 1.1.7.
I've been tearing my hair out for a few hours now, any help appreciated.
Thanks!
Upvotes: 0
Views: 939
Reputation: 174729
kafkaTemplate.send("mytopic", "this is a message");
You should NEVER start interacting with external services in a @PostConstruct
method - you need to wait for the application to be built before doing so.
Implement SmartLifecyle
, return true
for isAutoStartup
and move that code to start()
.
Or implement ApplicationListener<ConstextRefreshedEvent>
and do the sends when you get the event.
Either way will ensure that the application is ready.
Upvotes: 2
Reputation: 3874
Just found the issue. MessageSender.onConstruct
is actually taking a lot of time to execute (100 seconds), and during the meantime it is preventing Spring from creating the other beans.
Upvotes: 0