pannu
pannu

Reputation: 538

Kafka test container flaky tests

Integrated kafka into my spring project. Have written an integration using TestContainer approach but tests fails from time to time. Seems like some problem with the initialisation of kafka server.

Here is my code below

def setupSpec() {
    kafka = new KafkaContainer()
    kafka.start()
    System.setProperty("kafka.consumer.endpoint", kafka.bootstrapServers.replace("PLAINTEXT://", ""))
}


def setup() {
    RestAssured.port = port
}

def "test profile update events"() {

    given:
    String INPUT_TOPIC = "EventXX"

    when:
    KafkaProducer<String, String> kafkaProducer = createProducer()
    kafkaProducer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get().topic()
    kafkaProducer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get().topic()
    kafkaProducer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get().topic()
    kafkaProducer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get().topic()

    then:
    TestUtil.waitFor({ EventConsumer.msgConsumed == true }, 5000)
    kafkaProducer.close()

}

Now interestingly if I add Thread.sleep(10000) before sending the message in the test it always works but this approach looks kind of dirty to me. How can we make sure that kafka server is up and running before running any test. I tried the following approach by validating the kafkaSendRecieve in the setupSpec but failed . I am pasting the code below

def validatekafkaSendRecieve() {
    def started = false
    String INPUT_TOPIC = "kafkaTest"
    def producer = createProducer()
    def consumer = createConsumer(INPUT_TOPIC)
    Thread.sleep(9000)

    while (!started) {
        producer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get()
        started = consumeMessage(INPUT_TOPIC, consumer)
    }
    producer.close()
    consumer.close()
}

def consumeMessage(String topic, KafkaConsumer kafkaConsumer) {
    def message = kafkaConsumer.poll(3)
    if (!message.isEmpty()) {
        def messageList = message.records(topic).asList()
        if (messageList != null && !message.isEmpty()) {
            return true
        }
    } else {
        return false
    }
}

Upvotes: 0

Views: 2452

Answers (1)

Vassilis
Vassilis

Reputation: 1054

Are you pre-creating the topic at which you produce? Try doing this. Not sure if this the problem you are facing (logs are needed), but when a topic is auto-created it takes some time for all its partitions to be assigned a leader.

Upvotes: 1

Related Questions