Apache Beam with Python. Error trying to authenticate Apache Kafka SASL_SSL OAUTHBEARER

I am developing a pipeline in Apache Beam 2.45.0 with Python 3.8.

I need to read data from Apache Kafka, using ReadfromKafka method.

The Apache Kafka server is securized with SASL_SSL protocol, using OAUTHBEARER mechanism.

I have developed a CallBackHandler in aim to manage tokens with Authn Server (it works on a simple kafka-consumer with kafka-python lib) but when I try this on Apache Beam over Dataflow, I get the following error:

Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set

This is my pipeline code:

p = beam.Pipeline(options=pipeline_options)

consumer_config = {'bootstrap.servers': self.bootstrap_servers,
                   'auto.offset.reset': 'earliest',
                   'security.protocol': 'SASL_SSL',
                   'sasl.mechanism': 'OAUTHBEARER',
                   'sasl.oauth.token.provider': TokenProvider(self.project_id, self.secret_id) # CallBackHandler class
                   }

messages = (
        p
        | "Read from Kafka" >> beam.io.kafka.ReadFromKafka(consumer_config=consumer_config, topics=[self.topic])
)

Any ideas?

Thanks!

Upvotes: 1

Views: 323

Answers (0)

Related Questions