Ivan Lymar
Ivan Lymar

Reputation: 2293

Embedded kafka producer test

I'm writing integration tests to test kafka producer.

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {kafkaProducerConfig.class, KafkaProducerIT.InnerConfig.class})
@EnableConfigurationProperties(KafkaProducerInfo.class)
@ComponentScan(basePackages = "...")
public class KafkaProducerIT {

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, "testtopic");

    @Autowired
    CustomKafkaProducer<String, String> KafkaProducer;

    @Autowired
    KafkaController kafkaController;

    @Test
    public void whenSendMessage_thenConsumeIt() throws InterruptedException {
        KafkaProducer.produceMessageToKafkaTopic("ahahahwow", "testtopic");
        kafkaController.countDownLatch.await();
    }

    @Configuration
    public static class InnerConfig {

        @Bean
        public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(ConsumerFactory<String, Object> replyConsumerFactory) {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(replyConsumerFactory);
            factory.setBatchListener(true);
            return factory;
        }

        @Bean
        KafkaController kafkaController() {
            return new KafkaController();
        }

    }

    public static class KafkaController {

        CountDownLatch countDownLatch = new CountDownLatch(1);

        @KafkaListener(topics = "testtopic")
        public void listen(final String payload) {
            countDownLatch.countDown();
        }
    }

}

Idea is that I want to send message to topic, read it using KafkaController and CountDownLatch. Issue that I have is that CountDownLatch is never triggered and test just hangs on await.

CustomKafkaProducer is just a wrapper which uses regular kafkaTemplate under the hood.

p.s.

During debug, there were several cases when flow entered listener and test passed. So issue is not related to wrong topic names etc.

Upvotes: 0

Views: 576

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

You need to set auto.offset.reset=earliest for the consumer. The default is latest so there is a race condition if the consumer starts after the record is sent.

Upvotes: 1

Related Questions