Danilo Cairolli
Danilo Cairolli

Reputation: 61

Kafka consumer using AWS_MSK_IAM ClassCastException error

I have MSK running on AWS and I'd like to consume information using AWS_MSK_IAM authentication.

My MSK is properly configured and I can consume the information using Kafka CLI with the following command:

../bin/kafka-console-consumer.sh --bootstrap-server b-1.kafka.*********.***********.amazonaws.com:9098 --consumer.config client_auth.properties --topic TopicTest --from-beginning

My client_auth.properties has the following information:

# Sets up TLS for encryption and SASL for authN.
security.protocol = SASL_SSL

# Identifies the SASL mechanism to use.
sasl.mechanism = AWS_MSK_IAM

# Binds SASL client implementation.
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;

# Encapsulates constructing a SigV4 signature based on extracted credentials.
# The SASL client bound by "sasl.jaas.config" invokes this class.
sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler

When I try to consume from my Databricks cluster using spark, I receive the following error:

Caused by: kafkashaded.org.apache.kafka.common.KafkaException: java.lang.ClassCastException: software.amazon.msk.auth.iam.IAMClientCallbackHandler cannot be cast to kafkashaded.org.apache.kafka.common.security.auth.AuthenticateCallbackHandler

Here is my cluster config: enter image description here

The libraries I'm using in the cluster:

enter image description here

And the code I'm running on Databricks:

raw = (
    spark
        .readStream
        .format('kafka')
        .option('kafka.bootstrap.servers', 'b-.kafka.*********.***********.amazonaws.com:9098')
        .option('subscribe', 'TopicTest') 
        .option('startingOffsets', 'earliest')
        .option('kafka.sasl.mechanism', 'AWS_MSK_IAM')
        .option('kafka.security.protocol', 'SASL_SSL')
        .option('kafka.sasl.jaas.config', 'software.amazon.msk.auth.iam.IAMLoginModule required;')
        .option('kafka.sasl.client.callback.handler.class', 'software.amazon.msk.auth.iam.IAMClientCallbackHandler')
        .load()
)

Upvotes: 1

Views: 4369

Answers (3)

Kudrat
Kudrat

Reputation: 21

I also have faced this issue and found the mistake in the config. Databricks uses shaded kafka libraries, that's why we should use paths to shaded libraries in the config as described here:

sasl.jaas.config = shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class = shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler

Upvotes: 2

Carlo Abi Chahine
Carlo Abi Chahine

Reputation: 11

I faced the same issue, I forked aws-msk-iam-auth in order to make it compatible with databricks. Just add the jar from the following release https://github.com/Iziwork/aws-msk-iam-auth-for-databricks/releases/tag/v1.1.2-databricks to your cluster.

Upvotes: 1

vishnu.bh
vishnu.bh

Reputation: 70

Though I haven't tested this, based on the comment from Andrew on being theoretically able to relocate the dependency, I dug a bit into the source of aws-msk-iam-auth. They have a compileOnly('org.apache.kafka:kafka-clients:2.4.1') in their build.gradle. Hence the uber jar doesn't contain this library and is picked up from whatever databricks has (and shaded).

They are also relocating all their dependent jars with a prefix. So changing the compileOnly to implementation and rebuilding the uber jar with gradle clean shadowJar should include and relocate the kafka jars without any conflicts when uploading to databricks.

Upvotes: 2

Related Questions