Kafka Schema Registry failed to connect to Kafka Broker (Testcontainers)

I need to write an integration test for Kafka and Schema Registry working together because of utilizing AVRO serialization.

My Maven configured as following (Apple M2 Max)

% mvn -version
Apache Maven 3.9.9 (8e8579a9e76f7d015ee5ec7bfcdc97d260186937)
Maven home: /opt/homebrew/Cellar/maven/3.9.9/libexec
Java version: 17.0.13, vendor: Amazon.com Inc., runtime: /Users/neo.bsuir/.sdkman/candidates/java/17.0.13-amzn
Default locale: en_US, platform encoding: UTF-8
OS name: "mac os x", version: "15.1.1", arch: "aarch64", family: "mac"

My test looks like

@SpringBootTest
@Testcontainers
public class KafkaIntegrationTest {

  public static final String CONFLUENT_PLATFORM_VERSION = "7.5.1";

  private static final ConfluentKafkaContainer KAFKA_CONTAINER;
  private static final GenericContainer<?> SCHEMA_REGISTER_CONTAINER;

  static {

    final Network network = Network.newNetwork();

    KAFKA_CONTAINER =
        new ConfluentKafkaContainer(
            DockerImageName.parse("confluentinc/cp-kafka:" + CONFLUENT_PLATFORM_VERSION))
            .withNetwork(network)
            .withNetworkAliases("kafka")
            .withEnv("KAFKA_BROKER_ID", "1")
            .withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094")
            .withEnv("KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://kafka:9092,BROKER://kafka:9093,CONTROLLER://kafka:9094")
            .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "PLAINTEXT:PLAINTEXT,BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT")
            .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
            .withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
            .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
            .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
            .withExposedPorts(9092, 9093, 9094)
            .waitingFor(Wait.forListeningPorts(9092));
    KAFKA_CONTAINER.start();

    SCHEMA_REGISTER_CONTAINER = new GenericContainer<>(
        DockerImageName.parse("confluentinc/cp-schema-registry:" + CONFLUENT_PLATFORM_VERSION))
        .withNetwork(network)
        .withExposedPorts(8081)
        .withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
        .withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081")
        .withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",KAFKA_CONTAINER.getBootstrapServers())
        .dependsOn(KAFKA_CONTAINER)
        .waitingFor(Wait.forHttp("/subjects").forStatusCode(200)
            .withStartupTimeout(Duration.ofSeconds(5)));
    SCHEMA_REGISTER_CONTAINER.start();
  }

  @Test
  void shouldHaveHealthyContainers() {
    assertThat(KAFKA_CONTAINER.isHealthy()).isTrue();
    assertThat(SCHEMA_REGISTER_CONTAINER.isHealthy()).isTrue();
  }

  @AfterAll
  static void afterAll() {
    if (SCHEMA_REGISTER_CONTAINER != null) {
      SCHEMA_REGISTER_CONTAINER.stop();
    }
    if (KAFKA_CONTAINER != null) {
      KAFKA_CONTAINER.stop();
    }
  }
}

I passed getBootstrapServers() to SCHEMA_REGISTER_CONTAINER but the test failed due to Connection to node -1 (localhost/127.0.0.1:54380) could not be established. Broker may not be available.

Full output from target/surefire-reports/com.example.testcontainers.KafkaIntegrationTest.txt:

-------------------------------------------------------------------------------
Test set: com.example.testcontainers.KafkaIntegrationTest
-------------------------------------------------------------------------------
Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 9.166 s <<< FAILURE! -- in com.example.testcontainers.KafkaIntegrationTest
com.example.testcontainers.KafkaIntegrationTest.shouldHaveHealthyContainers -- Time elapsed: 0.004 s <<< ERROR!
java.lang.ExceptionInInitializerError
    at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
...
Caused by: org.testcontainers.containers.ContainerLaunchException: Container startup failed for image confluentinc/cp-schema-registry:7.5.1
    at org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:351)
    at org.testcontainers.containers.GenericContainer.start(GenericContainer.java:322)
    at com.example.testcontainers.KafkaIntegrationTest.<clinit>(KafkaIntegrationTest.java:58)
    ... 5 more
Caused by: org.rnorth.ducttape.RetryCountExceededException: Retry limit hit with exception
    at org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:88)
    at org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:336)
    ... 7 more
Caused by: org.testcontainers.containers.ContainerLaunchException: Could not create/start container
    at org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:556)
    at org.testcontainers.containers.GenericContainer.lambda$doStart$0(GenericContainer.java:346)
    at org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:81)
    ... 8 more
Caused by: org.testcontainers.containers.ContainerLaunchException: Timed out waiting for URL to be accessible (http://localhost:55772/subjects should return HTTP [200])
    at org.testcontainers.containers.wait.strategy.HttpWaitStrategy.waitUntilReady(HttpWaitStrategy.java:320)
    at org.testcontainers.containers.wait.strategy.AbstractWaitStrategy.waitUntilReady(AbstractWaitStrategy.java:52)
    at org.testcontainers.containers.GenericContainer.waitUntilContainerStarted(GenericContainer.java:909)
    at org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:492)
    ... 10 more
Caused by: org.rnorth.ducttape.TimeoutException: Timeout waiting for result with exception
    at org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:54)
    at org.testcontainers.containers.wait.strategy.HttpWaitStrategy.waitUntilReady(HttpWaitStrategy.java:252)
    ... 13 more
Caused by: java.lang.RuntimeException: java.net.SocketException: Unexpected end of file from server
    at org.testcontainers.containers.wait.strategy.HttpWaitStrategy.lambda$null$6(HttpWaitStrategy.java:312)
    at org.rnorth.ducttape.ratelimits.RateLimiter.doWhenReady(RateLimiter.java:27)
    at org.testcontainers.containers.wait.strategy.HttpWaitStrategy.lambda$waitUntilReady$7(HttpWaitStrategy.java:257)
    at org.rnorth.ducttape.unreliables.Unreliables.lambda$retryUntilSuccess$0(Unreliables.java:43)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.net.SocketException: Unexpected end of file from server
    at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:954)
    at java.base/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:761)
    at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:951)
    at java.base/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:761)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1725)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1626)
    at java.base/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:529)
    at org.testcontainers.containers.wait.strategy.HttpWaitStrategy.lambda$null$6(HttpWaitStrategy.java:276)
    ... 7 more

com.example.testcontainers.KafkaIntegrationTest -- Time elapsed: 9.166 s <<< ERROR!
java.lang.NoClassDefFoundError: Could not initialize class com.example.testcontainers.KafkaIntegrationTest
    at java.base/java.lang.reflect.Method.invoke(Method.java:569)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1092)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
Caused by: java.lang.ExceptionInInitializerError: Exception org.testcontainers.containers.ContainerLaunchException: Container startup failed for image confluentinc/cp-schema-registry:7.5.1 [in thread "main"]
    at org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:351)
    at org.testcontainers.containers.GenericContainer.start(GenericContainer.java:322)
    at com.example.testcontainers.KafkaIntegrationTest.<clinit>(KafkaIntegrationTest.java:58)
    at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
    at java.base/java.util.Optional.orElseGet(Optional.java:364)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    ... 1 more

Complete code example available at https://github.com/AlexOreshkevich/cp-kafka-schema-registry-testcontainers

I tried to use as a value for SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS many options, incl

Also I tried to use latest Confluent Platform (7.8.0), same issues.

I expect test to run successfully which means schema registry connected to kafka broker. After that I could proceed with producer/consumer config to test AVRO etc.

Upvotes: 1

Views: 73

Answers (2)

Edd&#250; Mel&#233;ndez
Edd&#250; Mel&#233;ndez

Reputation: 6530

You can find a complete example here. It uses org.testcontainers.kafka.KafkaContainer, but you can replace it with org.testcontainers.kafka.ConfluentKafkaContainer. Both, allow to register additional listeners.

Upvotes: 0

The root cause was an attempt to start container in static initialization block.

Also new Kafka containers available in testcontainers 1.20.X were not be able to work with schema registry, working one is org.testcontainers.containers.KafkaContainer.

Minor notice: there is no need to explicitly call start() for KAFKA_CONTAINER, because it was marked as a dependency for the schema registry and would be started implicitly for it.

Full working example looks like:

package com.example.testcontainers;

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@SpringBootTest
@Testcontainers
public class KafkaIntegrationTest {

  public static final String CONFLUENT_PLATFORM_VERSION = "7.8.0";

  private static final GenericContainer<?> KAFKA_CONTAINER;
  private static final GenericContainer<?> SCHEMA_REGISTER_CONTAINER;

  static {

    final Network network = Network.newNetwork();

    KAFKA_CONTAINER =
        new KafkaContainer(
            DockerImageName.parse("confluentinc/cp-kafka:" + CONFLUENT_PLATFORM_VERSION))
            .withNetwork(network)
            .withStartupTimeout(java.time.Duration.ofSeconds(30));

    SCHEMA_REGISTER_CONTAINER = new GenericContainer<>(
        DockerImageName.parse("confluentinc/cp-schema-registry:" + CONFLUENT_PLATFORM_VERSION))
        .withNetwork(network)
        .dependsOn(KAFKA_CONTAINER)
        .withExposedPorts(8081)
        .waitingFor(Wait.forHttp("/subjects").forStatusCode(200))
        .withStartupTimeout(Duration.ofSeconds(30))
        .withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
        .withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081")
        .withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
            KAFKA_CONTAINER.getNetworkAliases().get(0) + ":9092");
  }

  @BeforeAll
  static void init() {
    SCHEMA_REGISTER_CONTAINER.start();
  }

  @Test
  void shouldHaveHealthyContainers() {
    assertThat(KAFKA_CONTAINER.isRunning()).isTrue();
    assertThat(SCHEMA_REGISTER_CONTAINER.isRunning()).isTrue();
  }

  @AfterAll
  static void afterAll() {
    if (SCHEMA_REGISTER_CONTAINER != null) {
      SCHEMA_REGISTER_CONTAINER.stop();
    }
    if (KAFKA_CONTAINER != null) {
      KAFKA_CONTAINER.stop();
    }
  }
}

Upvotes: 1

Related Questions