Reputation: 61
I have MSK running on AWS and I'd like to consume information using AWS_MSK_IAM authentication.
My MSK is properly configured and I can consume the information using Kafka CLI with the following command:
../bin/kafka-console-consumer.sh --bootstrap-server b-1.kafka.*********.***********.amazonaws.com:9098 --consumer.config client_auth.properties --topic TopicTest --from-beginning
My client_auth.properties has the following information:
# Sets up TLS for encryption and SASL for authN.
security.protocol = SASL_SSL
# Identifies the SASL mechanism to use.
sasl.mechanism = AWS_MSK_IAM
# Binds SASL client implementation.
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;
# Encapsulates constructing a SigV4 signature based on extracted credentials.
# The SASL client bound by "sasl.jaas.config" invokes this class.
sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler
When I try to consume from my Databricks cluster using spark, I receive the following error:
Caused by: kafkashaded.org.apache.kafka.common.KafkaException: java.lang.ClassCastException: software.amazon.msk.auth.iam.IAMClientCallbackHandler cannot be cast to kafkashaded.org.apache.kafka.common.security.auth.AuthenticateCallbackHandler
The libraries I'm using in the cluster:
And the code I'm running on Databricks:
raw = (
spark
.readStream
.format('kafka')
.option('kafka.bootstrap.servers', 'b-.kafka.*********.***********.amazonaws.com:9098')
.option('subscribe', 'TopicTest')
.option('startingOffsets', 'earliest')
.option('kafka.sasl.mechanism', 'AWS_MSK_IAM')
.option('kafka.security.protocol', 'SASL_SSL')
.option('kafka.sasl.jaas.config', 'software.amazon.msk.auth.iam.IAMLoginModule required;')
.option('kafka.sasl.client.callback.handler.class', 'software.amazon.msk.auth.iam.IAMClientCallbackHandler')
.load()
)
Upvotes: 1
Views: 4369
Reputation: 21
I also have faced this issue and found the mistake in the config. Databricks uses shaded kafka libraries, that's why we should use paths to shaded libraries in the config as described here:
sasl.jaas.config = shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class = shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler
Upvotes: 2
Reputation: 11
I faced the same issue, I forked aws-msk-iam-auth
in order to make it compatible with databricks. Just add the jar from the following release https://github.com/Iziwork/aws-msk-iam-auth-for-databricks/releases/tag/v1.1.2-databricks to your cluster.
Upvotes: 1
Reputation: 70
Though I haven't tested this, based on the comment from Andrew on being theoretically able to relocate the dependency, I dug a bit into the source of aws-msk-iam-auth. They have a compileOnly('org.apache.kafka:kafka-clients:2.4.1')
in their build.gradle
. Hence the uber jar doesn't contain this library and is picked up from whatever databricks has (and shaded).
They are also relocating all their dependent jars with a prefix. So changing the compileOnly
to implementation
and rebuilding the uber jar with gradle clean shadowJar
should include and relocate the kafka jars without any conflicts when uploading to databricks.
Upvotes: 2