Reputation: 185
I'm writing a Kafka integration test for a simple Spring Boot application. The application simple publishes to a Kafka topic.
I am using an Embedded Kafka instance for the test. The test works perfectly fine when run through Intellij but fails when I run it via gradle. It looks as thought the latch
countdown never reaches 0 and the test eventually times out.
Producer Config:
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrap-address}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, String> articleProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> articleKafkaTemplate() {
return new KafkaTemplate<>(articleProducerFactory());
}
}
Producer:
public class KafkaProducer {
@Value(value = "kafka.topic-name")
String topicName;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message, String topic) throws KafkaPublishException {
try {
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topic, message);
future.get();
} catch (Exception e) {
throw new KafkaPublishException(e.getMessage());
}
}
public String getTopicName() {
return topicName;
}
Consumer:
@Component
public class KafkaConsumerHelper {
private CountDownLatch latch = new CountDownLatch(1);
private String payload = null;
@KafkaListener(topics = "${test.topic}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
setPayload(consumerRecord.toString());
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
public String getPayload() {
return payload;
}
private void setPayload(String payload) {
this.payload = payload;
}
}
Test:
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class KafkaProducerTest {
@Autowired
private KafkaProducer producer;
@Autowired
private KafkaConsumerHelper consumer;
@Value("${test.topic}")
private String topic;
@Test
public void shouldSuccessfullyPublishAnArticleMessageToEmbeddedKafka()
throws Exception {
String message = createArticle();
producer.sendMessage(message, topic);
consumer.getLatch().await();
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), containsString(message));
}
application.yml:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
auto-offset-reset: earliest
group-id: my-id
test:
topic: embedded-test-topic
partitions-number: 1
replication-factor: 1
Any idea what is the issue?
Upvotes: 5
Views: 11573
Reputation: 185
For anyone looking at this question in the future, my problem was that I was not using @EmbeddedKafka
properly.
The fix was to add bootstrapServersProperty = "spring.kafka.bootstrap-servers"
property into the @EmbeddedKafka
annotation.
@EmbeddedKafka(partitions = 1, bootstrapServersProperty = "spring.kafka.bootstrap-servers")
More information in the Kafka Docs.
Upvotes: 8