Dominika
Dominika

Reputation: 185

Embedded Kafka integration test - consumer never completes

I'm writing a Kafka integration test for a simple Spring Boot application. The application simple publishes to a Kafka topic.

I am using an Embedded Kafka instance for the test. The test works perfectly fine when run through Intellij but fails when I run it via gradle. It looks as thought the latch countdown never reaches 0 and the test eventually times out.

Producer Config:

public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrap-address}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, String> articleProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> articleKafkaTemplate() {
        return new KafkaTemplate<>(articleProducerFactory());
    }
}

Producer:

public class KafkaProducer {

    @Value(value = "kafka.topic-name")
    String topicName;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message, String topic) throws KafkaPublishException {
        try {
            ListenableFuture<SendResult<String, String>> future =
                    kafkaTemplate.send(topic, message);
           future.get();
        } catch (Exception e) {
            throw new KafkaPublishException(e.getMessage());
        }

    }

    public String getTopicName() {
        return topicName;
    }

Consumer:

@Component
public class KafkaConsumerHelper {
    private CountDownLatch latch = new CountDownLatch(1);
    private String payload = null;

    @KafkaListener(topics = "${test.topic}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {

        setPayload(consumerRecord.toString());
        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }

    public String getPayload() {
        return payload;
    }

    private void setPayload(String payload) {
        this.payload = payload;
    }
}

Test:

@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class KafkaProducerTest {

    @Autowired
    private KafkaProducer producer;

    @Autowired
    private KafkaConsumerHelper consumer;

    @Value("${test.topic}")
    private String topic;


    @Test
    public void shouldSuccessfullyPublishAnArticleMessageToEmbeddedKafka()
            throws Exception {

        String message = createArticle();

        producer.sendMessage(message, topic);
        consumer.getLatch().await();

        assertThat(consumer.getLatch().getCount(), equalTo(0L));
        assertThat(consumer.getPayload(), containsString(message));
    }

application.yml:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      auto-offset-reset: earliest
      group-id: my-id
test:
  topic: embedded-test-topic
  partitions-number: 1
  replication-factor: 1

Any idea what is the issue?

Upvotes: 5

Views: 11573

Answers (1)

Dominika
Dominika

Reputation: 185

For anyone looking at this question in the future, my problem was that I was not using @EmbeddedKafka properly.

The fix was to add bootstrapServersProperty = "spring.kafka.bootstrap-servers" property into the @EmbeddedKafka annotation.

@EmbeddedKafka(partitions = 1, bootstrapServersProperty = "spring.kafka.bootstrap-servers")

More information in the Kafka Docs.

Upvotes: 8

Related Questions