Reputation: 3536
I am using new @ServiceConnection annotation to manage KafkaContainer
in my tests and it works as expected - KafkaContainerConnectionDetails
is used during auto configuration to override bootstrap servers property.
Now, i also want to use custom GenericContainer
subclass i created - SchemaRegistryContainer
.
I would also like to manage this container with @ServiceConnection
annotation, but it crashes with ConnectionDetailsNotFoundException: No ConnectionDetails found for source '@ServiceConnection source for Bean 'schemaRegistryContainer'
.
I would like have my own custom ConnectionDetails
(or some alternative way) for this @ServiceConnection
, so that i can modify application properties i need when SchemaRegistryContainer
starts. In this specific case, i need to modify property spring.kafka.consumer.properties.schema-registry-url
with url from container.
Is this there a way to achieve this with @ServiceDefinition
?
I dont want to fall back to using static fields and @DynamicPropertySource
.
My test configuration
@TestConfiguration(proxyBeanMethods = false)
public class ContainerConfig {
public static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka");
public static final String CONFLUENT_PLATFORM_VERSION = "7.4.1";
@Bean
@ServiceConnection
KafkaContainer kafkaContainer() {
return new KafkaContainer(DEFAULT_IMAGE_NAME.withTag(CONFLUENT_PLATFORM_VERSION));
}
@Bean
@ServiceConnection
SchemaRegistryContainer schemaRegistryContainer(KafkaContainer kafkaContainer) {
return new SchemaRegistryContainer(CONFLUENT_PLATFORM_VERSION, kafkaContainer);
}
}
SchemaRegistryContainer:
public class SchemaRegistryContainer extends GenericContainer<SchemaRegistryContainer> {
private static final int PORT = 8081;
public SchemaRegistryContainer(String confluentVersion, KafkaContainer kafka) {
super("confluentinc/cp-schema-registry:" + confluentVersion);
withExposedPorts(PORT);
dependsOn(kafka);
withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", kafka.getBootstrapServers());
}
public String getSchemaRegistryUrl() {
return "http://" + getHost() + ":" + getMappedPort(PORT);
}
}
Upvotes: 0
Views: 3604
Reputation: 960
How did you wire with testcontainers the schema registry URL (which is dynamically created) when creating a SpecificAvroSerde? When creating the pipeline I need to depend on Environment
instead of KafkaProperties
which is not really nice.
The @ServiceConnection
does not help in this case.
And the @DynamicPropertySource
is not updating the Spring provided KafkaProperties
only the Environment
so I ended up using
private SpecificAvroSerde<TicketSale> ticketSaleSerde() {
final SpecificAvroSerde<TicketSale> serde = new SpecificAvroSerde<>();
Map<String, String> config = new HashMap<>();
// This does not work with testcontainers, the @DynamicPropertySource is
// not updating the KafkaProperties only the Environment
// String schemaRegistryURL = kafkaProperties.getStreams().getProperties().get(SCHEMA_REGISTRY_URL_CONFIG);
String schemaRegistryURL = env.getProperty("spring.kafka.streams.properties[0].schema.registry.url");
logger.info("setting the ticketSaleSerde - schemaRegistryURL:" + schemaRegistryURL);
config.put(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryURL);
serde.configure(config, false);
return serde;
}
and I have the following:
@DynamicPropertySource
static void props(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
registry.add("spring.kafka.streams.bootstrap-servers", kafkaContainer::getBootstrapServers);
registry.add("spring.kafka.streams.properties[0].schema.registry.url",
new Supplier<>() {
@Override
public Object get() {
return "http://" + schemaRegistry.getHost() + ":" + schemaRegistry.getMappedPort(8081);
}
});
}
Upvotes: 0
Reputation: 3536
I found out i was misunderstanding what @ServiceConnection
was doing.
In fact, this annotation is not needed at all, if i dont have custom ConnectionDetails
but i still can use container as @Bean.
For customizing properties after contaner starts, i used DynamicPropertyRegistry. This is in fact also mentioned in docs: https://docs.spring.io/spring-boot/docs/3.1.0/reference/htmlsingle/#features.testing.testcontainers.at-development-time.dynamic-properties
New code:
@Bean
SchemaRegistryContainer schemaRegistryContainer(KafkaContainer kafkaContainer, DynamicPropertyRegistry dynamicPropertyRegistry) {
SchemaRegistryContainer schemaRegistryContainer = new SchemaRegistryContainer(CONFLUENT_PLATFORM_VERSION, kafkaContainer);
dynamicPropertyRegistry.add("spring.kafka.consumer.properties.schema.registry.url", schemaRegistryContainer::getTarget);
dynamicPropertyRegistry.add("spring.kafka.producer.properties.schema.registry.url", schemaRegistryContainer::getTarget);
return schemaRegistryContainer;
}
Upvotes: 0