Reputation: 538
Integrated kafka into my spring project. Have written an integration using TestContainer approach but tests fails from time to time. Seems like some problem with the initialisation of kafka server.
Here is my code below
def setupSpec() {
kafka = new KafkaContainer()
kafka.start()
System.setProperty("kafka.consumer.endpoint", kafka.bootstrapServers.replace("PLAINTEXT://", ""))
}
def setup() {
RestAssured.port = port
}
def "test profile update events"() {
given:
String INPUT_TOPIC = "EventXX"
when:
KafkaProducer<String, String> kafkaProducer = createProducer()
kafkaProducer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get().topic()
kafkaProducer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get().topic()
kafkaProducer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get().topic()
kafkaProducer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get().topic()
then:
TestUtil.waitFor({ EventConsumer.msgConsumed == true }, 5000)
kafkaProducer.close()
}
Now interestingly if I add Thread.sleep(10000) before sending the message in the test it always works but this approach looks kind of dirty to me. How can we make sure that kafka server is up and running before running any test. I tried the following approach by validating the kafkaSendRecieve in the setupSpec but failed . I am pasting the code below
def validatekafkaSendRecieve() {
def started = false
String INPUT_TOPIC = "kafkaTest"
def producer = createProducer()
def consumer = createConsumer(INPUT_TOPIC)
Thread.sleep(9000)
while (!started) {
producer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get()
started = consumeMessage(INPUT_TOPIC, consumer)
}
producer.close()
consumer.close()
}
def consumeMessage(String topic, KafkaConsumer kafkaConsumer) {
def message = kafkaConsumer.poll(3)
if (!message.isEmpty()) {
def messageList = message.records(topic).asList()
if (messageList != null && !message.isEmpty()) {
return true
}
} else {
return false
}
}
Upvotes: 0
Views: 2452
Reputation: 1054
Are you pre-creating the topic at which you produce? Try doing this. Not sure if this the problem you are facing (logs are needed), but when a topic is auto-created it takes some time for all its partitions to be assigned a leader.
Upvotes: 1