Khalil Kabara
Khalil Kabara

Reputation: 168

Kafka consumer works in simple Java application but fails in Spring context

I defined a kafka consumer as seen in the code below. It runs, authenticates, and consumes fine when I run from the main method. If however I convert this class into a spring component, authentication fails when the spring app is started. This consumer is part of a larger spring application that I was not able to get running. In attempts to isolate the problem, I created a simple java class (below) to test only the consumer. It works well in this context, but fails in a larger Spring app.

The application prints the following line repeatedly:

org.apache.kafka.clients.NetworkClient: Connection to node -1 terminated during authentication. This may indicate that authentication failed due to invalid credentials.

org.apache.kafka.clients.NetworkClient: Bootstrap broker server1.my_domain.com disconnected
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SampleConsumer {

    public static final String BOOTSTRAP_SERVERS = "server1.my_domain.com";
    public static final String TOPIC = "SOME_TOPIC";
    public static final String CONSUMER_GROUP = "SOME_GROUP";

    public static void start(){
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put("sasl.jaas.config", getJaasConfig());

        final KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));

        final int giveUp = 100;
        int noRecordCount = 0;

        while(true){
            final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            if(consumerRecords.count() == 0){
                noRecordCount++;
                if(noRecordCount > giveUp) break;
                else continue;
            }
            consumerRecords.forEach(System.out::println);
        }
    }

    private static String getJaasConfig(){
        return "com.sun.security.auth.module.Krb5LoginModule required"
                + " serviceName=\"kafka\""
                + " doNotPrompt=true"
                + " useKeyTab=true"
                + " storeKey=true"
                + " principal=\"[email protected]\""
                + " keyTab=\"D:/certs/sampleKeyTab\";";
    }

    public static void main(String[] args) {
        start();
    }
}

Upvotes: 1

Views: 712

Answers (1)

JavaTechnical
JavaTechnical

Reputation: 9357

In the snippet you have posted, you have used static methods, I suppose you have non-static versions of those in your spring component since static methods are quite atypical in spring components.

Check if your spring component class is properly initialized i.e. all the required properties are set and if you are catching and supressing the exceptions.

If you are not using and relied on plain KafkaConsumer then spring has no control over your KafkaConsumer that you create i.e. to say, it does not manage it, so whether it is written in a spring component or otherwise, it should behave the same.

If you have created a Spring component, typically the start() logic ought to be in the @PostConstruct excluding the continuous polling (which blocks the post-construct).

Debugging:

If you are relying on spring to fetch KafkaConsumer properties, ensure that they are properly set in your application.properties or from where-ever they are picked up. You can print the properties of both the consumers and check.

Upvotes: 2

Related Questions