ca9163d9
ca9163d9

Reputation: 29159

Kafka consumer receives message if set group_id to None, but it doesn't receive any message if not None?

I have the following Kafka consumer, it works well if assigning the group_id to None - it received all historical messages and my newly tested message.

consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        auto_offset_reset=auto_offset_reset,
        enable_auto_commit=enable_auto_commit,
        group_id=group_id,
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )

for m in consumer:

However, it doesn't receive anything if I set the group_id to some value. I tried to run the test producer to send new messages and nothing is received.

The consumer console does show the following message:

2020-11-07 00:56:01 INFO     ThreadPoolExecutor-0_0 base.py (Re-)joining group my_group
2020-11-07 00:56:07 INFO     ThreadPoolExecutor-0_0 base.py Successfully joined group my_group with generation 497
2020-11-07 00:56:07 INFO     ThreadPoolExecutor-0_0 subscription_state.py Updated partition assignment: []
2020-11-07 00:56:07 INFO     ThreadPoolExecutor-0_0 consumer.py Setting newly assigned partitions set() for group my_group

Upvotes: 1

Views: 5506

Answers (2)

erg
erg

Reputation: 99

I know, this is not the solution to the author's problem. Still, if you landed here you might be having this problem for another reason. Same as I had.

So, at least for kafka-python v2.0.2 and Aiven Kafka broker setup, the problem was solved by adding dry call of consumer.poll(). This is especially weird since this is not required when no group_id is asssigned.

Output from:

def get():
    for message in consumer:
        print(message.value)
    consumer.commit()

Is nothing in this case

While below works as expected. It reads out only new messages from last commit():

Output from:

def get():
    consumer.poll()
    for message in consumer:
        print(message.value)
    consumer.commit()

It outputs all messages in this topic since last commit, as expected

JFYI, class constructor looks like this:

    consumer = KafkaConsumer(
        topics,
        bootstrap_servers=self._service_uri,
        auto_offset_reset='earliest',
        enable_auto_commit=False,
        client_id='my_consumer_name',
        group_id=self.GROUP_ID,
        security_protocol="SSL",
        ssl_cafile=self._ca_path,
        ssl_certfile=self._cert_path,
        ssl_keyfile=self._key_path,
    )

¯\_(ツ)_/¯

Upvotes: 0

Michael Heil
Michael Heil

Reputation: 18475

One partition of a topic can only be consumed by one consumer within the same ConsumerGroup.

If you do not set the group.id, the KafkaConsumer will generate a new, random group.id for you. As this group.id is unique you will see data is being consumed.

If you have multiple consumers running with the identical group.id, only one consumer will read the data whereas the other one stays idle not consuming anything.

Upvotes: 5

Related Questions