Reputation: 2293
I'm writing integration tests to test kafka producer.
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {kafkaProducerConfig.class, KafkaProducerIT.InnerConfig.class})
@EnableConfigurationProperties(KafkaProducerInfo.class)
@ComponentScan(basePackages = "...")
public class KafkaProducerIT {
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, "testtopic");
@Autowired
CustomKafkaProducer<String, String> KafkaProducer;
@Autowired
KafkaController kafkaController;
@Test
public void whenSendMessage_thenConsumeIt() throws InterruptedException {
KafkaProducer.produceMessageToKafkaTopic("ahahahwow", "testtopic");
kafkaController.countDownLatch.await();
}
@Configuration
public static class InnerConfig {
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(ConsumerFactory<String, Object> replyConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(replyConsumerFactory);
factory.setBatchListener(true);
return factory;
}
@Bean
KafkaController kafkaController() {
return new KafkaController();
}
}
public static class KafkaController {
CountDownLatch countDownLatch = new CountDownLatch(1);
@KafkaListener(topics = "testtopic")
public void listen(final String payload) {
countDownLatch.countDown();
}
}
}
Idea is that I want to send message to topic, read it using KafkaController
and CountDownLatch
.
Issue that I have is that CountDownLatch
is never triggered and test just hangs on await
.
CustomKafkaProducer
is just a wrapper which uses regular kafkaTemplate
under the hood.
p.s.
During debug, there were several cases when flow entered listener and test passed. So issue is not related to wrong topic names etc.
Upvotes: 0
Views: 576
Reputation: 174554
You need to set auto.offset.reset=earliest for the consumer. The default is latest so there is a race condition if the consumer starts after the record is sent.
Upvotes: 1