Daniel Pop
Daniel Pop

Reputation: 551

@KafkaListener with containerFactory is not triggered in @SpringBootTest

I am trying to write an integration test for my Kafka consumer using @SpringBootTest and Testcontainers for the underlying infrastructure. My setup looks like this one:

public class Consumer {

    public final String topic;
    public final String consumerGroup;

    @KafkaListener(topics = "#{__listener.topic}", groupId = "#{__listener.consumerGroup}",
            containerFactory = "containerFactory")
    public void consume(String message) {
        LOGGER.info("Received message: {}", message);
public class KafkaConsumerConfig {
    // I actually use custom deserializers for my own entities, using String here for example's sake
    public ConsumerFactory<String, String> consumerFactory(ObjectMapper objectMapper) {
        return new DefaultKafkaConsumerFactory<>(
                kafkaProperties.buildConsumerProperties(null), new StringDeserializer(), new StringDeserializer());

    public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory(CommonErrorHandler errorHandler,
                                                                                                                    ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        return factory;

In my test, I want to write something to the topic with a KafkaTemplate and then make sure the listener has been called. Like this:

@ContextConfiguration(initializers = TestContainersInitializer.class)
class ConsumerTest {
    private KafkaTemplate<String, String> kafkaTemplate;

    private Consumer consumer;

    void consume() {
        Instant now = Instant.now();
        kafkaTemplate.send("topic-name", "1", "message");
            .atMost(5, SECONDS)
            .untilAsserted(() -> {

My message is written correctly to the topic. However, the listener does not start up to consume the message. I am sure about this because if I use a standard KafkaConsumer to check the contents, they are being written just right. Do you have any idea what is going wrong with this?

As this is a more complex setup originally, I tried to run also the simpler variant described above, and it still doesn't work.

Some references I consulted:

  1. https://testcontainers.com/guides/testing-spring-boot-kafka-listener-using-testcontainers/
  2. https://www.atomicjar.com/2023/06/testing-kafka-applications-with-testcontainers/

Also raised an issue on the Spring Boot project: https://github.com/spring-projects/spring-boot/issues/41009

Upvotes: 0

Views: 281

Answers (1)

Daniel Pop
Daniel Pop

Reputation: 551

The problem was I was missing the @TestPropertySource(properties = "spring.kafka.consumer.auto-offset-reset=earliest") configuration on the test class, and the consumer was missing the messages I was writing.

Upvotes: 0

Related Questions