matejs
matejs

Reputation: 3536

How to use @ServiceConnection with GenericContainer and custom ConnectionDetails

I am using new @ServiceConnection annotation to manage KafkaContainer in my tests and it works as expected - KafkaContainerConnectionDetails is used during auto configuration to override bootstrap servers property.

Now, i also want to use custom GenericContainer subclass i created - SchemaRegistryContainer.

I would also like to manage this container with @ServiceConnection annotation, but it crashes with ConnectionDetailsNotFoundException: No ConnectionDetails found for source '@ServiceConnection source for Bean 'schemaRegistryContainer'.

I would like have my own custom ConnectionDetails (or some alternative way) for this @ServiceConnection, so that i can modify application properties i need when SchemaRegistryContainer starts. In this specific case, i need to modify property spring.kafka.consumer.properties.schema-registry-url with url from container.

Is this there a way to achieve this with @ServiceDefinition? I dont want to fall back to using static fields and @DynamicPropertySource.

My test configuration

@TestConfiguration(proxyBeanMethods = false)
public class ContainerConfig {
    public static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka");
    public static final String CONFLUENT_PLATFORM_VERSION = "7.4.1";

    @Bean
    @ServiceConnection
    KafkaContainer kafkaContainer() {
        return new KafkaContainer(DEFAULT_IMAGE_NAME.withTag(CONFLUENT_PLATFORM_VERSION));
    }

    @Bean
    @ServiceConnection
    SchemaRegistryContainer schemaRegistryContainer(KafkaContainer kafkaContainer) {
        return new SchemaRegistryContainer(CONFLUENT_PLATFORM_VERSION, kafkaContainer);
    }
}

SchemaRegistryContainer:

public class SchemaRegistryContainer extends GenericContainer<SchemaRegistryContainer> {

    private static final int PORT = 8081;

    public SchemaRegistryContainer(String confluentVersion, KafkaContainer kafka) {
        super("confluentinc/cp-schema-registry:" + confluentVersion);

        withExposedPorts(PORT);
        dependsOn(kafka);
        withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", kafka.getBootstrapServers());
    }

    public String getSchemaRegistryUrl() {
        return "http://" + getHost() + ":" + getMappedPort(PORT);
    }
}

Upvotes: 0

Views: 3604

Answers (2)

Zoltan Altfatter
Zoltan Altfatter

Reputation: 960

How did you wire with testcontainers the schema registry URL (which is dynamically created) when creating a SpecificAvroSerde? When creating the pipeline I need to depend on Environment instead of KafkaProperties which is not really nice.

The @ServiceConnection does not help in this case.

And the @DynamicPropertySource is not updating the Spring provided KafkaProperties only the Environment so I ended up using

private SpecificAvroSerde<TicketSale> ticketSaleSerde() {
    final SpecificAvroSerde<TicketSale> serde = new SpecificAvroSerde<>();
    Map<String, String> config = new HashMap<>();

    // This does not work with testcontainers, the @DynamicPropertySource is
    // not updating the KafkaProperties only the Environment
    // String schemaRegistryURL = kafkaProperties.getStreams().getProperties().get(SCHEMA_REGISTRY_URL_CONFIG);

    String schemaRegistryURL = env.getProperty("spring.kafka.streams.properties[0].schema.registry.url");

    logger.info("setting the ticketSaleSerde - schemaRegistryURL:" + schemaRegistryURL);
    config.put(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryURL);
    serde.configure(config, false);
    return serde;
}

and I have the following:

@DynamicPropertySource
static void props(DynamicPropertyRegistry registry) {
    registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
    registry.add("spring.kafka.streams.bootstrap-servers", kafkaContainer::getBootstrapServers);
    registry.add("spring.kafka.streams.properties[0].schema.registry.url",
            new Supplier<>() {
                @Override
                public Object get() {
                    return "http://" + schemaRegistry.getHost() + ":" + schemaRegistry.getMappedPort(8081);
                }
            });
}

Upvotes: 0

matejs
matejs

Reputation: 3536

I found out i was misunderstanding what @ServiceConnection was doing. In fact, this annotation is not needed at all, if i dont have custom ConnectionDetails but i still can use container as @Bean.

For customizing properties after contaner starts, i used DynamicPropertyRegistry. This is in fact also mentioned in docs: https://docs.spring.io/spring-boot/docs/3.1.0/reference/htmlsingle/#features.testing.testcontainers.at-development-time.dynamic-properties

New code:

    @Bean
    SchemaRegistryContainer schemaRegistryContainer(KafkaContainer kafkaContainer, DynamicPropertyRegistry dynamicPropertyRegistry) {
        SchemaRegistryContainer schemaRegistryContainer = new SchemaRegistryContainer(CONFLUENT_PLATFORM_VERSION, kafkaContainer);
        dynamicPropertyRegistry.add("spring.kafka.consumer.properties.schema.registry.url", schemaRegistryContainer::getTarget);
        dynamicPropertyRegistry.add("spring.kafka.producer.properties.schema.registry.url", schemaRegistryContainer::getTarget);
        return schemaRegistryContainer;
    }

Upvotes: 0

Related Questions