AnarchoEnte
AnarchoEnte

Reputation: 568

Smallrye Kafka - Define broker per channel - No events are consumed

I'm using Smallrye-Kafka in my Quarkus application for connecting to kafka. Until now the requirement was "having a single bootstrap-server which is valid for all channels". The configuration and event consuming worked without problem.s This requirement changed and now I'm trying to configure the target broker per channel.

I've configured my application.properties and was able to connect to different brokers per topic. The startup log shows a successful connection to my different brokers without any errors. The correct topics are also mentioned in the log, so everything looks perfect. But the problem is, that no events are consumed from the topics.

My application.properties contains some some config options which are valid for all channels. Beside that there are two topics which are referencing to a kafka-configuration option which is mentioned in the official documentation. I'm using the same consumer-group for both topics by purpose. There are multiple cluster-nodes of my application where each node uses a different consumer-group.

My code is shown below:

# Kafka common
kafka.health-enabled=false
kafka.session.timeout.ms=45000

# Common configuration used for all channels
mp.messaging.connector=smallrye-kafka
mp.messaging.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
mp.messaging.value-deserialization-failure-handler=kafka-value-failure-handler
mp.messaging.fail-on-deserialization-failure=false
mp.messaging.auto.offset.reset=earliest
mp.messaging.use.latest.version=true
mp.messaging.auto.register.schemas=false
mp.messaging.connections.max.idle.ms=-1
mp.messaging.failure-strategy=ignore

# Topic 1
mp.messaging.incoming.my-channel-1.topic=my-topic-1
mp.messaging.incoming.my-channel-1.group.id=my-consumer-group

mp.messaging.incoming.my-channel-1.kafka-configuration=my-configuration-1


# Topic 2
mp.messaging.incoming.my-channel-2.topic=my-topic-2
mp.messaging.incoming.my-channel-2.group.id=my-consumer-group

mp.messaging.incoming.my-channel-2.kafka-configuration=my-configuration-2
import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import java.util.HashMap;
import java.util.Map;

@Singleton
@Slf4j
public class KafkaAuthConfiguration {

    private static final String SASL_CONFIGURATION_STRING = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\"password=\"%s\";";


    @Produces
    @Identifier("my-configuration-1")
    @Singleton
    public Map<String, Object> config1() {
        final Map<String, Object> config = createConfig("clusterUrl_1", "clusterApiKey_1", "clusterApiSecret_1");
        log.info("Created kafka config 1");
        return config;
    }

    @Produces
    @Identifier("my-configuration-2")
    @Singleton
    public Map<String, Object> config2() {
        final Map<String, Object> config = createConfig("clusterUrl_2", "clusterApiKey_2", "clusterApiSecret_2");
        log.info("Created kafka config 2");
        return config;
    }  

    private Map<String, Object> createConfig(final String clusterUrl, final String apiKey, final String apiSecret) {
        final HashMap<String, Object> config = new HashMap<>(4);

        config.put("bootstrap.servers", clusterUrl);
        config.put("security.protocol", "SASL_SSL");
        config.put("sasl.mechanism", "PLAIN");
        config.put("ssl.endpoint.identification.algorithm", "https");
        config.put("sasl.jaas.config", String.format(SASL_CONFIGURATION_STRING, apiKey, apiSecret));

        return config;
    }
}

Does somebody have a hint for me what's missing in the configuration so that no events are consumed from by brokers?

Upvotes: 0

Views: 205

Answers (2)

AnarchoEnte
AnarchoEnte

Reputation: 568

After creating a little working example using 2 kafka brokers in a docker environemnt I've noticed that everyhting works as expected. Hence the issue is not in smallrye-reactive-messaging but maybe somewhere on the kafka side.

Upvotes: 0

Ozan G&#252;nalp
Ozan G&#252;nalp

Reputation: 474

This block of properties is not correct:

# Common configuration used for all channels
mp.messaging.connector=smallrye-kafka
mp.messaging.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
mp.messaging.value-deserialization-failure-handler=kafka-value-failure-handler
mp.messaging.fail-on-deserialization-failure=false
mp.messaging.auto.offset.reset=earliest
mp.messaging.use.latest.version=true
mp.messaging.auto.register.schemas=false
mp.messaging.connections.max.idle.ms=-1
mp.messaging.failure-strategy=ignore

For configuring all channels of a connector you need to prefix the attribute with mp.messaging.connector.smallrye-kafka..

So it would be :

mp.messaging.connector.smallrye-kafka.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
mp.messaging.connector.smallrye-kafka.value-deserialization-failure-handler=kafka-value-failure-handler
mp.messaging.connector.smallrye-kafka.fail-on-deserialization-failure=false
mp.messaging.connector.smallrye-kafka.auto.offset.reset=earliest
mp.messaging.connector.smallrye-kafka.use.latest.version=true
mp.messaging.connector.smallrye-kafka.auto.register.schemas=false
mp.messaging.connector.smallrye-Kafka.connections.max.idle.ms=-1
mp.messaging.connector.smallrye-Kafka.failure-strategy=ignore

Upvotes: 0

Related Questions