coding owl
coding owl

Reputation: 73

Kafka listener in test container, avoiding port 9092

I've written an Integration Test to show that a message sent to Kafka will arrive with a listener. It passes if and only if I use KAFKA_PORT=9092. That constant is the port used on the developer machine (or CI machine).

Ultimately I want to do this on a dynamically allocated port (i.e. using a GenericContainer and not a FixedHostPortGenericContainer) but for the moment I would simply like to be able to use a different port.

If I set KAFKA_PORT=59092 in the below code then the test fails and I see console output such as Connection to node -1 (localhost/127.0.0.1:9092) could not be established e.g.:

2020-06-08 12:16:22.374  WARN 1371 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=test-consumer-group] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

I assume there is some additional configuration I need to do so that no attempt is made to use port 9092, but this is eluding me.

A recreated, stripped back test is below, as is the associated gradle.build.

KafaSpikeFixedPort.java

package com.example.kafkaspike;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@Testcontainers
public class KafaSpikeFixedPort {

    // Test works only if this port is 9092 (matching the Docker container port)
    final static int KAFKA_PORT = 9092;

    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.producer.bootstrap-servers",
                     () -> "kubernetes.docker.internal:"+ KAFKA_PORT);
        registry.add("spring.kafka.consumer.group-id",
                     () -> "test-consumer-group");
    }

    @Container
    private GenericContainer kafkaContainer =
            new FixedHostPortGenericContainer("obsidiandynamics/kafka:2.3.0-11")
                    .withFixedExposedPort(KAFKA_PORT, 9092)
                    .withExtraHost("kubernetes.docker.internal", "127.0.0.1")
                    .withEnv("KAFKA_LISTENERS",
                             "INTERNAL://:29092," +
                             "EXTERNAL://:"+KAFKA_PORT)
                    .withEnv("KAFKA_ADVERTISED_LISTENERS",
                             "INTERNAL://kubernetes.docker.internal:29092," +
                             "EXTERNAL://kubernetes.docker.internal:"+KAFKA_PORT)
                    .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
                             "INTERNAL:PLAINTEXT," +
                             "EXTERNAL:PLAINTEXT")
                    .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME",
                             "INTERNAL")
                    .waitingFor(Wait.forLogMessage(
                            ".*INFO\\s+\\[KafkaServer\\s+id=\\d+\\]" +
                            "\\s+started\\s+\\(kafka.server.KafkaServer\\).*",
                            1));

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    private List<String> payloadsReceived = new ArrayList<>();

    @KafkaListener(autoStartup = "false", topics = "topic1")
    public void onMessage(@Payload String payload) {
        payloadsReceived.add(payload);
    }

    @BeforeEach
    public void beforeEach() {
        payloadsReceived.clear();
        for(MessageListenerContainer listenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
            listenerContainer.start();
        }
        sleep(2_000); // Just for the spike. (Eliminates code checking the listener container states.)
    }

    @AfterEach
    public void afterEach() {
        for(MessageListenerContainer listenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
            listenerContainer.stop();
        }
        sleep(2_000); // Just for the spike. (Eliminates code checking the listener container states.)
    }

    @Test
    @Timeout(value = 3, unit = TimeUnit.SECONDS)
    public void test() {
        kafkaTemplate.send("topic1", "Hello World!");
        sleep(1_000); // Just for the spike. (Wait for message in production test, with test timeout.)
        assertAll(
                () -> assertEquals(1, payloadsReceived.size()),
                () -> assertEquals("Hello World!", payloadsReceived.get(0))
                 );
    }

    private void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch(InterruptedException e) {
        }
    }
}

gradle.build

plugins {
    id 'org.springframework.boot' version '2.2.7.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'java'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.kafka:spring-kafka'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
    testImplementation 'org.springframework.kafka:spring-kafka-test'
    // for TestContainers
    testCompile group: 'org.testcontainers', name: 'testcontainers', version: "1.14.1"
    testCompile group: 'org.testcontainers', name: 'junit-jupiter', version: "1.14.1"
}

test {
    useJUnitPlatform()
}

Upvotes: 3

Views: 10272

Answers (1)

bsideup
bsideup

Reputation: 3073

There are reasons why Testcontainers' Kafka module exists :)

It takes care of the ports setup by deferring running Kafka's process, so that it can provide the actual assigned random port as ADVERTISED_HOST env variable.

Try it, or have a look at the sources for some inspiration:
https://github.com/testcontainers/testcontainers-java/tree/master/modules/kafka

Upvotes: 5

Related Questions