Reputation: 31
I need to write an integration test for Kafka and Schema Registry working together because of utilizing AVRO serialization.
My Maven configured as following (Apple M2 Max)
% mvn -version
Apache Maven 3.9.9 (8e8579a9e76f7d015ee5ec7bfcdc97d260186937)
Maven home: /opt/homebrew/Cellar/maven/3.9.9/libexec
Java version: 17.0.13, vendor: Amazon.com Inc., runtime: /Users/neo.bsuir/.sdkman/candidates/java/17.0.13-amzn
Default locale: en_US, platform encoding: UTF-8
OS name: "mac os x", version: "15.1.1", arch: "aarch64", family: "mac"
My test looks like
@SpringBootTest
@Testcontainers
public class KafkaIntegrationTest {
public static final String CONFLUENT_PLATFORM_VERSION = "7.5.1";
private static final ConfluentKafkaContainer KAFKA_CONTAINER;
private static final GenericContainer<?> SCHEMA_REGISTER_CONTAINER;
static {
final Network network = Network.newNetwork();
KAFKA_CONTAINER =
new ConfluentKafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:" + CONFLUENT_PLATFORM_VERSION))
.withNetwork(network)
.withNetworkAliases("kafka")
.withEnv("KAFKA_BROKER_ID", "1")
.withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094")
.withEnv("KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://kafka:9092,BROKER://kafka:9093,CONTROLLER://kafka:9094")
.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "PLAINTEXT:PLAINTEXT,BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT")
.withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
.withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.withExposedPorts(9092, 9093, 9094)
.waitingFor(Wait.forListeningPorts(9092));
KAFKA_CONTAINER.start();
SCHEMA_REGISTER_CONTAINER = new GenericContainer<>(
DockerImageName.parse("confluentinc/cp-schema-registry:" + CONFLUENT_PLATFORM_VERSION))
.withNetwork(network)
.withExposedPorts(8081)
.withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
.withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",KAFKA_CONTAINER.getBootstrapServers())
.dependsOn(KAFKA_CONTAINER)
.waitingFor(Wait.forHttp("/subjects").forStatusCode(200)
.withStartupTimeout(Duration.ofSeconds(5)));
SCHEMA_REGISTER_CONTAINER.start();
}
@Test
void shouldHaveHealthyContainers() {
assertThat(KAFKA_CONTAINER.isHealthy()).isTrue();
assertThat(SCHEMA_REGISTER_CONTAINER.isHealthy()).isTrue();
}
@AfterAll
static void afterAll() {
if (SCHEMA_REGISTER_CONTAINER != null) {
SCHEMA_REGISTER_CONTAINER.stop();
}
if (KAFKA_CONTAINER != null) {
KAFKA_CONTAINER.stop();
}
}
}
I passed getBootstrapServers()
to SCHEMA_REGISTER_CONTAINER
but the test failed due to Connection to node -1 (localhost/127.0.0.1:54380) could not be established. Broker may not be available.
Full output from target/surefire-reports/com.example.testcontainers.KafkaIntegrationTest.txt
:
-------------------------------------------------------------------------------
Test set: com.example.testcontainers.KafkaIntegrationTest
-------------------------------------------------------------------------------
Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 9.166 s <<< FAILURE! -- in com.example.testcontainers.KafkaIntegrationTest
com.example.testcontainers.KafkaIntegrationTest.shouldHaveHealthyContainers -- Time elapsed: 0.004 s <<< ERROR!
java.lang.ExceptionInInitializerError
at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
...
Caused by: org.testcontainers.containers.ContainerLaunchException: Container startup failed for image confluentinc/cp-schema-registry:7.5.1
at org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:351)
at org.testcontainers.containers.GenericContainer.start(GenericContainer.java:322)
at com.example.testcontainers.KafkaIntegrationTest.<clinit>(KafkaIntegrationTest.java:58)
... 5 more
Caused by: org.rnorth.ducttape.RetryCountExceededException: Retry limit hit with exception
at org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:88)
at org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:336)
... 7 more
Caused by: org.testcontainers.containers.ContainerLaunchException: Could not create/start container
at org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:556)
at org.testcontainers.containers.GenericContainer.lambda$doStart$0(GenericContainer.java:346)
at org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:81)
... 8 more
Caused by: org.testcontainers.containers.ContainerLaunchException: Timed out waiting for URL to be accessible (http://localhost:55772/subjects should return HTTP [200])
at org.testcontainers.containers.wait.strategy.HttpWaitStrategy.waitUntilReady(HttpWaitStrategy.java:320)
at org.testcontainers.containers.wait.strategy.AbstractWaitStrategy.waitUntilReady(AbstractWaitStrategy.java:52)
at org.testcontainers.containers.GenericContainer.waitUntilContainerStarted(GenericContainer.java:909)
at org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:492)
... 10 more
Caused by: org.rnorth.ducttape.TimeoutException: Timeout waiting for result with exception
at org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:54)
at org.testcontainers.containers.wait.strategy.HttpWaitStrategy.waitUntilReady(HttpWaitStrategy.java:252)
... 13 more
Caused by: java.lang.RuntimeException: java.net.SocketException: Unexpected end of file from server
at org.testcontainers.containers.wait.strategy.HttpWaitStrategy.lambda$null$6(HttpWaitStrategy.java:312)
at org.rnorth.ducttape.ratelimits.RateLimiter.doWhenReady(RateLimiter.java:27)
at org.testcontainers.containers.wait.strategy.HttpWaitStrategy.lambda$waitUntilReady$7(HttpWaitStrategy.java:257)
at org.rnorth.ducttape.unreliables.Unreliables.lambda$retryUntilSuccess$0(Unreliables.java:43)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.net.SocketException: Unexpected end of file from server
at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:954)
at java.base/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:761)
at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:951)
at java.base/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:761)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1725)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1626)
at java.base/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:529)
at org.testcontainers.containers.wait.strategy.HttpWaitStrategy.lambda$null$6(HttpWaitStrategy.java:276)
... 7 more
com.example.testcontainers.KafkaIntegrationTest -- Time elapsed: 9.166 s <<< ERROR!
java.lang.NoClassDefFoundError: Could not initialize class com.example.testcontainers.KafkaIntegrationTest
at java.base/java.lang.reflect.Method.invoke(Method.java:569)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1092)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
Caused by: java.lang.ExceptionInInitializerError: Exception org.testcontainers.containers.ContainerLaunchException: Container startup failed for image confluentinc/cp-schema-registry:7.5.1 [in thread "main"]
at org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:351)
at org.testcontainers.containers.GenericContainer.start(GenericContainer.java:322)
at com.example.testcontainers.KafkaIntegrationTest.<clinit>(KafkaIntegrationTest.java:58)
at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
at java.base/java.util.Optional.orElseGet(Optional.java:364)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
... 1 more
Complete code example available at https://github.com/AlexOreshkevich/cp-kafka-schema-registry-testcontainers
I tried to use as a value for SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS
many options, incl
KAFKA_CONTAINER.getBootstrapServers()
kafka:9092
KAFKA_CONTAINER.getNetworkAliases().get(0) + ":9092"
Also I tried to use latest Confluent Platform (7.8.0
), same issues.
I expect test to run successfully which means schema registry connected to kafka broker. After that I could proceed with producer/consumer config to test AVRO etc.
Upvotes: 1
Views: 73
Reputation: 6530
You can find a complete example here. It uses org.testcontainers.kafka.KafkaContainer, but you can replace it with org.testcontainers.kafka.ConfluentKafkaContainer. Both, allow to register additional listeners.
Upvotes: 0
Reputation: 31
The root cause was an attempt to start container in static initialization block.
Also new Kafka containers available in testcontainers 1.20.X were not be able to work with schema registry, working one is org.testcontainers.containers.KafkaContainer
.
Minor notice: there is no need to explicitly call start()
for KAFKA_CONTAINER
, because it was marked as a dependency for the schema registry and would be started implicitly for it.
Full working example looks like:
package com.example.testcontainers;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
@SpringBootTest
@Testcontainers
public class KafkaIntegrationTest {
public static final String CONFLUENT_PLATFORM_VERSION = "7.8.0";
private static final GenericContainer<?> KAFKA_CONTAINER;
private static final GenericContainer<?> SCHEMA_REGISTER_CONTAINER;
static {
final Network network = Network.newNetwork();
KAFKA_CONTAINER =
new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:" + CONFLUENT_PLATFORM_VERSION))
.withNetwork(network)
.withStartupTimeout(java.time.Duration.ofSeconds(30));
SCHEMA_REGISTER_CONTAINER = new GenericContainer<>(
DockerImageName.parse("confluentinc/cp-schema-registry:" + CONFLUENT_PLATFORM_VERSION))
.withNetwork(network)
.dependsOn(KAFKA_CONTAINER)
.withExposedPorts(8081)
.waitingFor(Wait.forHttp("/subjects").forStatusCode(200))
.withStartupTimeout(Duration.ofSeconds(30))
.withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
.withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
KAFKA_CONTAINER.getNetworkAliases().get(0) + ":9092");
}
@BeforeAll
static void init() {
SCHEMA_REGISTER_CONTAINER.start();
}
@Test
void shouldHaveHealthyContainers() {
assertThat(KAFKA_CONTAINER.isRunning()).isTrue();
assertThat(SCHEMA_REGISTER_CONTAINER.isRunning()).isTrue();
}
@AfterAll
static void afterAll() {
if (SCHEMA_REGISTER_CONTAINER != null) {
SCHEMA_REGISTER_CONTAINER.stop();
}
if (KAFKA_CONTAINER != null) {
KAFKA_CONTAINER.stop();
}
}
}
Upvotes: 1