Shankar
Shankar

Reputation: 2835

@KafkaListener in Unit test case does not consume from the container factory

I wrote a JUnit test case to test the code in the "With Java Configuration" lesson in the Spring Kafka docs. (https://docs.spring.io/spring-kafka/reference/htmlsingle/#_with_java_configuration). The onedifference is that I am using an Embedded Kafka Server in the class, instead of a localhost server. I am using Spring Boot 2.0.2 and its Spring-Kafka dependency.

While running this test case, I see that the Consumer is not reading the message from the topic and the "assertTrue" check fails. There are no other errors.

@RunWith(SpringRunner.class)
public class SpringConfigSendReceiveMessage {

    public static final String DEMO_TOPIC =  "demo_topic";
    @Autowired
    private Listener listener;

    @Test
    public void testSimple() throws Exception {
        template.send(DEMO_TOPIC, 0, "foo");
        template.flush();
        assertTrue(this.listener.latch.await(60, TimeUnit.SECONDS));
    }

    @Autowired
    private KafkaTemplate<Integer, String> template;

    @Configuration
    @EnableKafka
    public static class Config {

        @Bean
        public KafkaEmbedded kafkaEmbedded() {
            return new KafkaEmbedded(1, true, 1, DEMO_TOPIC);
        }

        @Bean
        public ConsumerFactory<Integer, String> createConsumerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return new DefaultKafkaConsumerFactory<>(props);
        }

        @Bean
        public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(createConsumerFactory());
            return factory;
        }

        @Bean
        public Listener listener() {
            return new Listener();
        }

        @Bean
        public ProducerFactory<Integer, String> producerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
            props.put(ProducerConfig.RETRIES_CONFIG, 0);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(props);
        }

        @Bean
        public KafkaTemplate<Integer, String> kafkaTemplate() {
            return new KafkaTemplate<Integer, String>(producerFactory());
        }
    }       

}

class Listener {
    public final CountDownLatch latch = new CountDownLatch(1);

    @KafkaListener(id = "foo", topics = DEMO_TOPIC)
    public void listen1(String foo) {
        this.latch.countDown();
    }
}

I think that this is because the @KafkaListener is using some wrong/default setting when reading from the topic. I dont see any errors in the logs.

Is this unit test case correct? How can i find the object that is created for the KafkaListener annotation and see which Kafka broker it consumes from? Any inputs will be helpful. Thanks.

Upvotes: 4

Views: 5994

Answers (2)

Shankar
Shankar

Reputation: 2835

The answer by @gary-russell is the best solution. Another way to resolve this issue was to delay the message send step by some time. This will enable the consumer to be ready. The following is also a correct solution.

Lesson learned - For unit testing Kafka consumers, either consume all the messages in the test case, or ensure that the Consumer is ready before Producer sends the message.

@Test
public void testSimple() throws Exception {
    Thread.sleep(1000);
    template.send(DEMO_TOPIC, 0, "foo");
    template.flush();
    assertTrue(this.listener.latch.await(60, TimeUnit.SECONDS));
}

Upvotes: 1

Gary Russell
Gary Russell

Reputation: 174689

The message is sent before the consumer starts.

By default, new consumers start consuming at the end of the topic.

Add

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Upvotes: 7

Related Questions