Reputation: 551
I am trying to write an integration test for my Kafka consumer using @SpringBootTest and Testcontainers for the underlying infrastructure. My setup looks like this one:
@Component
@ConditionalOnProperty("outdoor.kafka.enableKafkaReading")
public class Consumer {
@Value("${topic.name}
public final String topic;
@Value("${consumer.group}"
public final String consumerGroup;
@KafkaListener(topics = "#{__listener.topic}", groupId = "#{__listener.consumerGroup}",
containerFactory = "containerFactory")
public void consume(String message) {
LOGGER.info("Received message: {}", message);
}
}
@Configuration
public class KafkaConsumerConfig {
// I actually use custom deserializers for my own entities, using String here for example's sake
@Bean
public ConsumerFactory<String, String> consumerFactory(ObjectMapper objectMapper) {
return new DefaultKafkaConsumerFactory<>(
kafkaProperties.buildConsumerProperties(null), new StringDeserializer(), new StringDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory(CommonErrorHandler errorHandler,
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(leaderboardStandingEventConsumerFactory);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}
In my test, I want to write something to the topic with a KafkaTemplate and then make sure the listener has been called. Like this:
@SpringBootTest
@ContextConfiguration(initializers = TestContainersInitializer.class)
class ConsumerTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@SpyBean
private Consumer consumer;
@Test
void consume() {
Instant now = Instant.now();
kafkaTemplate.send("topic-name", "1", "message");
await()
.pollInterval(Duration.ofSeconds(1))
.atMost(5, SECONDS)
.untilAsserted(() -> {
verify(consumer).consume(any());
});
}
}
My message is written correctly to the topic. However, the listener does not start up to consume the message. I am sure about this because if I use a standard KafkaConsumer to check the contents, they are being written just right. Do you have any idea what is going wrong with this?
As this is a more complex setup originally, I tried to run also the simpler variant described above, and it still doesn't work.
Some references I consulted:
Also raised an issue on the Spring Boot project: https://github.com/spring-projects/spring-boot/issues/41009
Upvotes: 0
Views: 281
Reputation: 551
The problem was I was missing the @TestPropertySource(properties = "spring.kafka.consumer.auto-offset-reset=earliest")
configuration on the test class, and the consumer was missing the messages I was writing.
Upvotes: 0