nope
nope

Reputation: 151

Why is my kafka listener not working in my Unit Test

PROBLEM

I am attempting to unit test a kafka producer that sends an Integer. When running my unit test I can see outputs suggesting that my producer and consumer are working correctly in the console. That is the producer is sending a value of zero, the consumer received zero, and totaled that integer with the running total.

sending data = 0
received content = 0
sending total = 0

However the unit test ultimately fails when the test ConsumerRecord comes back with a null entry. I guess the listener is not working at all.

java.lang.AssertionError: 
Expected: a ConsumerRecord with value 0
     but: is null

QUESTION

Is there an error in the way I have defined the container/messageListener? Or something more fundamentally wrong with my unit test?

This url contains the code of both my producer/config and consumer/config. It might be easier to view it there rather than making code section huge on here.

https://github.com/ewingian/RestCalculator/tree/master/src/main/java/com/calculator/kafka

Unit Test

package com.calculator;

/**
 * Created by ian on 2/9/18.
 */
import com.calculator.kafka.services.KafkaProducer;
import com.calculator.kafka.services.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertThat;
import static org.springframework.kafka.test.assertj.KafkaConditions.key;
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.*;

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class KafkaTest {

    // in case I need to send some integers
    private Integer i1 = 0;
    private Integer i2 = 3;

    private static final String SENDER_TOPIC = "input";

    private List<Integer> l1;

    @Autowired
    private KafkaProducer producer;

    @Autowired
    private KafkaConsumer consumer;

    private KafkaMessageListenerContainer<String, Integer> container;

    private BlockingQueue<ConsumerRecord<String, Integer>> records;

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTest.class);


    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);



    @Before
    public void testTemplate() throws Exception {

        // set up the Kafka consumer properties
        Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);

        // create a Kafka consumer factory
        DefaultKafkaConsumerFactory<String, Integer> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);

        // set the topic that needs to be consumed
        ContainerProperties containerProperties = new ContainerProperties(SENDER_TOPIC);

        // create a Kafka MessageListenerContainer
        container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

        // create a thread safe queue to store the received message
        records = new LinkedBlockingQueue<>();

        // setup a Kafka message listener
        container.setupMessageListener(new MessageListener<String, Integer>() {
            @Override
            public void onMessage(ConsumerRecord<String, Integer> record) {
                LOGGER.debug("test-listener received message='{}'", record.toString());
                records.add(record);
            }
        });

        // start the container and underlying message listener
        container.start();

        // wait until the container has the required number of assigned partitions
        ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());


    }

    @After
    public void tearDown() {
        // stop the container
        container.stop();
    }

    @Test
    public void testSend() throws InterruptedException {
        // send the message
        producer.send(i1);

        // check that the message was received
        ConsumerRecord<String, Integer> received = records.poll(10, TimeUnit.SECONDS);
        // Hamcrest Matchers to check the value
        assertThat(received, hasValue(i1));

        // AssertJ Condition to check the key
        assertThat(received, hasKey(null));
    }
}

Upvotes: 3

Views: 9316

Answers (1)

Gary Russell
Gary Russell

Reputation: 174689

You need to use the correct the key/value serializers/deserializers. The KTU set up for Integer/String respectively, you need String/Integer.

This modified version of your test case works...

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class KafkaTest {

    // in case I need to send some integers
    private final Integer i1 = 0;

    private final Integer i2 = 3;

    private static final String SENDER_TOPIC = "input";

    private List<Integer> l1;

    private KafkaMessageListenerContainer<String, Integer> container;

    private BlockingQueue<ConsumerRecord<String, Integer>> records;

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTest.class);

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);

    @Before
    public void testTemplate() throws Exception {

        // set up the Kafka consumer properties
        Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);

        // create a Kafka consumer factory
        DefaultKafkaConsumerFactory<String, Integer> consumerFactory = new DefaultKafkaConsumerFactory<>(
                consumerProperties);

        // set the topic that needs to be consumed
        ContainerProperties containerProperties = new ContainerProperties(SENDER_TOPIC);

        // create a Kafka MessageListenerContainer
        container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

        // create a thread safe queue to store the received message
        records = new LinkedBlockingQueue<>();

        // setup a Kafka message listener
        container.setupMessageListener(new MessageListener<String, Integer>() {
            @Override
            public void onMessage(ConsumerRecord<String, Integer> record) {
                LOGGER.debug("test-listener received message='{}'", record.toString());
                records.add(record);
            }
        });

        // start the container and underlying message listener
        container.start();

        // wait until the container has the required number of assigned partitions
        ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());

    }

    @After
    public void tearDown() {
        // stop the container
        container.stop();
    }

    @Test
    public void testSend() throws InterruptedException {
        // send the message
        Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        ProducerFactory<String, Integer> pf = new DefaultKafkaProducerFactory<>(producerProps);
        KafkaTemplate<String, Integer> template = new KafkaTemplate<>(pf);
        template.send(SENDER_TOPIC, i1);

        // check that the message was received
        ConsumerRecord<String, Integer> received = records.poll(10, TimeUnit.SECONDS);
        // Hamcrest Matchers to check the value
        assertThat(received, hasValue(i1));

        // AssertJ Condition to check the key
        assertThat(received, hasKey(null));

        System.out.println(received);
    }

}

EDIT

In response to your comment below...

Your producer is not wired up to talk to the embedded (test) broker. This works fine for me:

@Configuration
public class KafkaProducerConfig {

    @Value("${" + KafkaEmbedded.SPRING_EMBEDDED_KAFKA_BROKERS + ":localhost:9092}")
    private String bootstrapServer;

    @Bean
    public ProducerFactory<String, Integer> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Integer> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

and

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class KafkaTest {

    // in case I need to send some integers
    private final Integer i1 = 42;

    private final Integer i2 = 3;

    private static final String SENDER_TOPIC = "input";

    private List<Integer> l1;

    @Autowired
    private KafkaProducer producer;

    private KafkaMessageListenerContainer<String, Integer> container;

    private BlockingQueue<ConsumerRecord<String, Integer>> records;

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTest.class);

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);

    @Before
    public void testTemplate() throws Exception {

        // set up the Kafka consumer properties
        Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);

        // create a Kafka consumer factory
        DefaultKafkaConsumerFactory<String, Integer> consumerFactory = new DefaultKafkaConsumerFactory<>(
                consumerProperties);

        // set the topic that needs to be consumed
        ContainerProperties containerProperties = new ContainerProperties(SENDER_TOPIC);

        // create a Kafka MessageListenerContainer
        container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

        // create a thread safe queue to store the received message
        records = new LinkedBlockingQueue<>();

        // setup a Kafka message listener
        container.setupMessageListener(new MessageListener<String, Integer>() {
            @Override
            public void onMessage(ConsumerRecord<String, Integer> record) {
                LOGGER.debug("test-listener received message='{}'", record.toString());
                records.add(record);
            }
        });

        // start the container and underlying message listener
        container.start();

        // wait until the container has the required number of assigned partitions
        ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());

    }

    @After
    public void tearDown() {
        // stop the container
        container.stop();
    }

    @Test
    public void testSend() throws InterruptedException {
        // send the message
        producer.send(i1);

        // check that the message was received
        ConsumerRecord<String, Integer> received = records.poll(10, TimeUnit.SECONDS);
        // Hamcrest Matchers to check the value
        assertThat(received, hasValue(i1));

        // AssertJ Condition to check the key
        assertThat(received, hasKey(null));

        System.out.println(received);
    }

}

and

ConsumerRecord(topic = input, partition = 1, offset = 0, CreateTime = 1519487134283, checksum = 866641474, serialized key size = -1, serialized value size = 4, key = null, value = 42)

Upvotes: 5

Related Questions