Tomáš Sedloň
Tomáš Sedloň

Reputation: 163

Spark structured streaming Not authorized to access group

I am trying to read data from Kafka via spark structured streaming. However, in Spark 2.4.0., you cannot set group id for the stream (see How to set group.id for consumer group in kafka data source in Structured Streaming?).

However, as this is not set, spark simply generates the group Id and I am stuck at GroupAuthorizationException:

19/12/10 15:15:00 ERROR streaming.MicroBatchExecution: Query [id = 747090ff-120f-4a6d-b20e-634eb77ac7b8, runId = 63aa4cce-ad72-47f2-80f6-e87947b69685] terminated with error
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-d2420426-13d5-4bda-ad21-7d8e43ebf518-1874352823-driver-2

Any ideas how to bypass this please? Funny thing is, I am able to read this data via kafka-console-consumer.sh, where I am able to pass the group id in a .properties file.

Code throwing the exception:

val df = spark
  .readStream
  .format("kafka")
  .option("subscribe", "topic")
  .option("startingOffsets", "earliest")
  .option("kafka.group.id", "idThatShouldBeUsed")
  .option("kafka.bootstrap.servers", "server")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("kafka.ssl.truststore.location", "/location)
  .option("kafka.ssl.truststore.password", "pass")
  .option("kafka.sasl.jaas.config", """jaasToUse""")
  .load()
  .writeStream
  .outputMode("append")
  .format("console")
  .option("startingOffsets", "earliest")
  .start().awaitTermination()

Upvotes: 2

Views: 5217

Answers (1)

Tomáš Sedloň
Tomáš Sedloň

Reputation: 163

Seems that this is not solvable from the consumer's side. We ended up having to use bin/kafka-acls.sh and wildcards to allow all group ids generated by structured streaming.

kafka acl example:

bin/kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=zk:2181 --add --allow-principal User:'Bon' --operation READ --topic topicName --group='spark-kafka-source-' --resource-pattern-type prefixed

Upvotes: 5

Related Questions