Reputation: 11
I am trying to consume data from a Kafka topic using Confluent's Python library confluent-kafka. However, I get the error "Error: KafkaError{code=GROUP_AUTHORIZATION_FAILED,val=30,str="FindCoordinator response error: Group authorization failed."}. I get the same error independently of whether I use an integer or a string as my group.id.
However, using the exact same config file, I can consume data from the same topic using a different library (kafka3, which doesn't ask me to specify a user group) or a different tool (Kadeck / VS Code Kafka extension) so I am assuming that there's nothing wrong with the credentials or access rights themselves.
In the end, I have to get the code running using Confluent's library, though. Does anybody have an idea how to solve this error? The instance we're using is hosted on AWS if that makes any difference.
My code looks as follows:
if __name__ == "__main__":
config = {'bootstrap.servers': 'server',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': 'username',
'sasl.password': 'password',
'group.id': 'foo',
'auto.offset.reset': 'earliest'}
# Create Consumer
consumer = Consumer(config)
# Subscribe to topic
topic = 'mytopic'
consumer.subscribe([topic])
# Poll for new messages
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
print("Waiting...")
elif msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print(f"Topic {msg.topic()} Partition {msg.partition()} reached end at offset {msg.offset()}")
print(f"Error: {msg.error()}")
else:
print(f"Consumed event from topic {msg.topic()} with key {msg.key().decode('utf-8')} and value {msg.value().decode('utf-8')}")
process_message(msg)
except KeyboardInterrupt:
pass
finally:
consumer.close()
Upvotes: 1
Views: 1196