Reputation: 85
I have a Spring Boot project that has a Kafka listener that I want to test using Embedded Kafka. I have the Kafka Listener log out the message "record received". Which will only be be logged out if I add a Thread.sleep(1000)
to the start of the method.
Test class:
@SpringBootTest
@DirtiesContext
@EnableKafka
@EmbeddedKafka(partitions = 1, topics = { "my-topic" }, ports = 7654)
class KafkaTest {
private static final String TOPIC = "my-topic";
@Autowired
EmbeddedKafkaBroker kafkaBroker;
@Test
void testSendEvent() throws ExecutionException, InterruptedException {
// Thread.sleep(1000); // I wont see the Listener log message unless I add this sleep
Producer<Integer, String> producer = configureProducer();
ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(TOPIC, "myMessage");
producer.send(producerRecord).get();
producer.close();
}
private Producer<Integer, String> configureProducer() {
Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(kafkaBroker));
return new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer();
}
}
I don't want to use the fickle Thread.sleep()
The test is obviously executing before some setup processes have completed. I clearly need to wait on something, but I am not sure what nor how to do it.
Using:
Upvotes: 7
Views: 8214
Reputation: 99
You can use this small library for testing. All output records will be collected to blocking queue and you can poll them with timout:
@OutputQueue(topic = TOPIC_OUT, partitions = 1)
private BlockingQueue<ConsumerRecord<String, String>> consumerRecords;
@Test
void shouldFilterRecordWithoutHeader() throws ExecutionException, InterruptedException, TimeoutException {
final String messageIn = "hello world";
try (var producer = producer()) {
producer.send(new ProducerRecord<>(TOPIC_IN, messageIn)).get(5, TimeUnit.SECONDS);
}
ConsumerRecord<String, String> record = consumerRecords.poll(5, TimeUnit.SECONDS);
Assertions.assertThat(record).isNotNull();
}
Upvotes: 0
Reputation: 174554
Add an @EventListener
bean to the test context and (for example) count down a CountDownLatch
when a ConsumerStartedEvent
is received; then in the test
assertThat(eventListner.getLatch().await(10, TimeUnit.SECONDS)).isTrue();
See https://docs.spring.io/spring-kafka/docs/current/reference/html/#events
and
https://docs.spring.io/spring-kafka/docs/current/reference/html/#event-consumption
Or add a ConsumerRebalanceListener
and wait for partition assignment.
Upvotes: 7
Reputation: 2819
I clearly need to wait on something, but I am not sure what nor how to do it.
You need to use a different method to give Kafka
time to process and route the message ...
Look at this line ...
ConsumerRecord<String, String> message = records.poll(500, TimeUnit.MILLISECONDS);
When testing Kafka listeners we always specify a poll delay. This is because your message is given to kafka, which will then process it in another thread. And you need to wait for it.
Here's how it looks in context of the code its used in.
class UserKafkaProducerTest {
@Test
void testWriteToKafka() throws InterruptedException, JsonProcessingException {
// Create a user and write to Kafka
User user = new User("11111", "John", "Wick");
producer.writeToKafka(user);
// Read the message (John Wick user) with a test consumer from Kafka and assert its properties
ConsumerRecord<String, String> message = records.poll(500, TimeUnit.MILLISECONDS);
assertNotNull(message);
assertEquals("11111", message.key());
User result = objectMapper.readValue(message.value(), User.class);
assertNotNull(result);
assertEquals("John", result.getFirstName());
assertEquals("Wick", result.getLastName());
}
}
This is a code piece from this article, which makes stuff clear.
Upvotes: 0