Reputation: 151
PROBLEM
I am attempting to unit test a kafka producer that sends an Integer. When running my unit test I can see outputs suggesting that my producer and consumer are working correctly in the console. That is the producer is sending a value of zero, the consumer received zero, and totaled that integer with the running total.
sending data = 0
received content = 0
sending total = 0
However the unit test ultimately fails when the test ConsumerRecord comes back with a null entry. I guess the listener is not working at all.
java.lang.AssertionError:
Expected: a ConsumerRecord with value 0
but: is null
QUESTION
Is there an error in the way I have defined the container/messageListener? Or something more fundamentally wrong with my unit test?
This url contains the code of both my producer/config and consumer/config. It might be easier to view it there rather than making code section huge on here.
https://github.com/ewingian/RestCalculator/tree/master/src/main/java/com/calculator/kafka
Unit Test
package com.calculator;
/**
* Created by ian on 2/9/18.
*/
import com.calculator.kafka.services.KafkaProducer;
import com.calculator.kafka.services.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertThat;
import static org.springframework.kafka.test.assertj.KafkaConditions.key;
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.*;
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class KafkaTest {
// in case I need to send some integers
private Integer i1 = 0;
private Integer i2 = 3;
private static final String SENDER_TOPIC = "input";
private List<Integer> l1;
@Autowired
private KafkaProducer producer;
@Autowired
private KafkaConsumer consumer;
private KafkaMessageListenerContainer<String, Integer> container;
private BlockingQueue<ConsumerRecord<String, Integer>> records;
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTest.class);
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);
@Before
public void testTemplate() throws Exception {
// set up the Kafka consumer properties
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
// create a Kafka consumer factory
DefaultKafkaConsumerFactory<String, Integer> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);
// set the topic that needs to be consumed
ContainerProperties containerProperties = new ContainerProperties(SENDER_TOPIC);
// create a Kafka MessageListenerContainer
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
// create a thread safe queue to store the received message
records = new LinkedBlockingQueue<>();
// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, Integer>() {
@Override
public void onMessage(ConsumerRecord<String, Integer> record) {
LOGGER.debug("test-listener received message='{}'", record.toString());
records.add(record);
}
});
// start the container and underlying message listener
container.start();
// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
}
@After
public void tearDown() {
// stop the container
container.stop();
}
@Test
public void testSend() throws InterruptedException {
// send the message
producer.send(i1);
// check that the message was received
ConsumerRecord<String, Integer> received = records.poll(10, TimeUnit.SECONDS);
// Hamcrest Matchers to check the value
assertThat(received, hasValue(i1));
// AssertJ Condition to check the key
assertThat(received, hasKey(null));
}
}
Upvotes: 3
Views: 9316
Reputation: 174689
You need to use the correct the key/value serializers/deserializers. The KTU set up for Integer/String respectively, you need String/Integer.
This modified version of your test case works...
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class KafkaTest {
// in case I need to send some integers
private final Integer i1 = 0;
private final Integer i2 = 3;
private static final String SENDER_TOPIC = "input";
private List<Integer> l1;
private KafkaMessageListenerContainer<String, Integer> container;
private BlockingQueue<ConsumerRecord<String, Integer>> records;
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTest.class);
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);
@Before
public void testTemplate() throws Exception {
// set up the Kafka consumer properties
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
// create a Kafka consumer factory
DefaultKafkaConsumerFactory<String, Integer> consumerFactory = new DefaultKafkaConsumerFactory<>(
consumerProperties);
// set the topic that needs to be consumed
ContainerProperties containerProperties = new ContainerProperties(SENDER_TOPIC);
// create a Kafka MessageListenerContainer
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
// create a thread safe queue to store the received message
records = new LinkedBlockingQueue<>();
// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, Integer>() {
@Override
public void onMessage(ConsumerRecord<String, Integer> record) {
LOGGER.debug("test-listener received message='{}'", record.toString());
records.add(record);
}
});
// start the container and underlying message listener
container.start();
// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
}
@After
public void tearDown() {
// stop the container
container.stop();
}
@Test
public void testSend() throws InterruptedException {
// send the message
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
ProducerFactory<String, Integer> pf = new DefaultKafkaProducerFactory<>(producerProps);
KafkaTemplate<String, Integer> template = new KafkaTemplate<>(pf);
template.send(SENDER_TOPIC, i1);
// check that the message was received
ConsumerRecord<String, Integer> received = records.poll(10, TimeUnit.SECONDS);
// Hamcrest Matchers to check the value
assertThat(received, hasValue(i1));
// AssertJ Condition to check the key
assertThat(received, hasKey(null));
System.out.println(received);
}
}
EDIT
In response to your comment below...
Your producer is not wired up to talk to the embedded (test) broker. This works fine for me:
@Configuration
public class KafkaProducerConfig {
@Value("${" + KafkaEmbedded.SPRING_EMBEDDED_KAFKA_BROKERS + ":localhost:9092}")
private String bootstrapServer;
@Bean
public ProducerFactory<String, Integer> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Integer> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
and
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class KafkaTest {
// in case I need to send some integers
private final Integer i1 = 42;
private final Integer i2 = 3;
private static final String SENDER_TOPIC = "input";
private List<Integer> l1;
@Autowired
private KafkaProducer producer;
private KafkaMessageListenerContainer<String, Integer> container;
private BlockingQueue<ConsumerRecord<String, Integer>> records;
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTest.class);
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);
@Before
public void testTemplate() throws Exception {
// set up the Kafka consumer properties
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
// create a Kafka consumer factory
DefaultKafkaConsumerFactory<String, Integer> consumerFactory = new DefaultKafkaConsumerFactory<>(
consumerProperties);
// set the topic that needs to be consumed
ContainerProperties containerProperties = new ContainerProperties(SENDER_TOPIC);
// create a Kafka MessageListenerContainer
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
// create a thread safe queue to store the received message
records = new LinkedBlockingQueue<>();
// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, Integer>() {
@Override
public void onMessage(ConsumerRecord<String, Integer> record) {
LOGGER.debug("test-listener received message='{}'", record.toString());
records.add(record);
}
});
// start the container and underlying message listener
container.start();
// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
}
@After
public void tearDown() {
// stop the container
container.stop();
}
@Test
public void testSend() throws InterruptedException {
// send the message
producer.send(i1);
// check that the message was received
ConsumerRecord<String, Integer> received = records.poll(10, TimeUnit.SECONDS);
// Hamcrest Matchers to check the value
assertThat(received, hasValue(i1));
// AssertJ Condition to check the key
assertThat(received, hasKey(null));
System.out.println(received);
}
}
and
ConsumerRecord(topic = input, partition = 1, offset = 0, CreateTime = 1519487134283, checksum = 866641474, serialized key size = -1, serialized value size = 4, key = null, value = 42)
Upvotes: 5