Reputation: 29159
I have the following Kafka consumer, it works well if assigning the group_id
to None - it received all historical messages and my newly tested message.
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=enable_auto_commit,
group_id=group_id,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for m in consumer:
However, it doesn't receive anything if I set the group_id
to some value. I tried to run the test producer to send new messages and nothing is received.
The consumer console does show the following message:
2020-11-07 00:56:01 INFO ThreadPoolExecutor-0_0 base.py (Re-)joining group my_group 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 base.py Successfully joined group my_group with generation 497 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 subscription_state.py Updated partition assignment: [] 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 consumer.py Setting newly assigned partitions set() for group my_group
Upvotes: 1
Views: 5506
Reputation: 99
I know, this is not the solution to the author's problem. Still, if you landed here you might be having this problem for another reason. Same as I had.
So, at least for kafka-python v2.0.2 and Aiven Kafka broker setup, the problem was solved by adding dry call of consumer.poll(). This is especially weird since this is not required when no group_id is asssigned.
Output from:
def get():
for message in consumer:
print(message.value)
consumer.commit()
Is nothing in this case
While below works as expected. It reads out only new messages from last commit():
Output from:
def get():
consumer.poll()
for message in consumer:
print(message.value)
consumer.commit()
It outputs all messages in this topic since last commit, as expected
JFYI, class constructor looks like this:
consumer = KafkaConsumer(
topics,
bootstrap_servers=self._service_uri,
auto_offset_reset='earliest',
enable_auto_commit=False,
client_id='my_consumer_name',
group_id=self.GROUP_ID,
security_protocol="SSL",
ssl_cafile=self._ca_path,
ssl_certfile=self._cert_path,
ssl_keyfile=self._key_path,
)
¯\_(ツ)_/¯
Upvotes: 0
Reputation: 18475
One partition of a topic can only be consumed by one consumer within the same ConsumerGroup.
If you do not set the group.id, the KafkaConsumer will generate a new, random group.id for you. As this group.id is unique you will see data is being consumed.
If you have multiple consumers running with the identical group.id, only one consumer will read the data whereas the other one stays idle not consuming anything.
Upvotes: 5