Chris
Chris

Reputation: 234

Kafka One particular consumers group is rebalancing

I created few month ago two consumers groups (each group has 3 consumers) in my Kafka chain :

This without problems.

Recently, I added a third group to add parralelisation into my chain :

Problem: new topic3 is not realy consumed (few 20 or 50 messages in each topics is consumed after many hours...) => huge "lag" (never consumed) on eahc of 3 partitions of the third group.

In kafka logs, I can see many rebalancing ONLY for third group (seems to reach 5 minutes default max.poll.interval.ms but I'm not sure : kafka message is not clearly specified : Preparing to rebalance group group3...). I tried to modify poll options, decreasing amount of max.poll.records etc.. things are better with decreasing but I continue to get rebalancing... I tried to add more CPU core (I was thinking that to many consumer were on my server for CPU) from 1 to 2 and finally 4 vcores: nothing really changes and htop not showing CPU core huge utilisation...

I only get this problem of lag/rebalancing with the new consumer group, and consumption code is the same as others consumers...

I don't know what to do... Any suggestions, on kafka conf, kafka CPU's utilisation or whatever ?

Note : all stack is under Docker containers

Thank you !

EDIT : Example 1 (loop of generation WITHOUT reset to 0) of kafka logs :

...
[2020-07-15 10:52:32,029] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 365 (__consumer_offsets-38) (reason: Adding new member kafka-python-2.0.1-319d2d85-ad11-4012-9e30-d8c94f706bbe with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 10:52:32,036] INFO [GroupCoordinator 1001]: Stabilized group group3 generation 366 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 10:52:32,040] INFO [GroupCoordinator 1001]: Assignment received from leader for group group3 for generation 366 (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 10:54:00,552] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 10:57:32,144] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id kafka-python-2.0.1-319d2d85-ad11-4012-9e30-d8c94f706bbe] in group group3 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 10:57:32,149] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 366 (__consumer_offsets-38) (reason: removing member kafka-python-2.0.1-319d2d85-ad11-4012-9e30-d8c94f706bbe on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 10:57:32,157] INFO [GroupCoordinator 1001]: Group group3 with generation 367 is now empty (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:04:00,553] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 2 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 11:06:14,173] INFO [GroupCoordinator 1001]: Dynamic Member with unknown member id joins group group3 in Empty state. Created a new member id kafka-python-2.0.1-9eae5a06-c39f-4b66-b9c6-e2e520d787c1 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:06:14,182] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 367 (__consumer_offsets-38) (reason: Adding new member kafka-python-2.0.1-9eae5a06-c39f-4b66-b9c6-e2e520d787c1 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:06:14,205] INFO [GroupCoordinator 1001]: Stabilized group group3 generation 368 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:06:14,210] INFO [GroupCoordinator 1001]: Assignment received from leader for group group3 for generation 368 (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:11:14,233] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id kafka-python-2.0.1-9eae5a06-c39f-4b66-b9c6-e2e520d787c1] in group group3 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:11:14,234] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 368 (__consumer_offsets-38) (reason: removing member kafka-python-2.0.1-9eae5a06-c39f-4b66-b9c6-e2e520d787c1 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:11:14,235] INFO [GroupCoordinator 1001]: Group group3 with generation 369 is now empty (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:14:00,552] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 11:15:42,480] INFO [GroupCoordinator 1001]: Dynamic Member with unknown member id joins group group3 in Empty state. Created a new member id kafka-python-2.0.1-560ebdf5-4c6f-4f12-85d5-bf6e7194b712 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:15:42,483] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 369 (__consumer_offsets-38) (reason: Adding new member kafka-python-2.0.1-560ebdf5-4c6f-4f12-85d5-bf6e7194b712 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:15:42,489] INFO [GroupCoordinator 1001]: Stabilized group group3 generation 370 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:15:42,505] INFO [GroupCoordinator 1001]: Assignment received from leader for group group3 for generation 370 (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:20:42,655] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id kafka-python-2.0.1-560ebdf5-4c6f-4f12-85d5-bf6e7194b712] in group group3 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:20:42,657] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 370 (__consumer_offsets-38) (reason: removing member kafka-python-2.0.1-560ebdf5-4c6f-4f12-85d5-bf6e7194b712 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:20:42,657] INFO [GroupCoordinator 1001]: Group group3 with generation 371 is now empty (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:24:00,552] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 11:34:00,552] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 11:36:26,840] INFO [GroupCoordinator 1001]: Dynamic Member with unknown member id joins group group3 in Empty state. Created a new member id kafka-python-2.0.1-6c853061-c4e8-4066-8940-2c7173709a96 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:36:26,860] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 371 (__consumer_offsets-38) (reason: Adding new member kafka-python-2.0.1-6c853061-c4e8-4066-8940-2c7173709a96 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:36:26,869] INFO [GroupCoordinator 1001]: Stabilized group group3 generation 372 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:36:26,879] INFO [GroupCoordinator 1001]: Assignment received from leader for group group3 for generation 372 (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:41:26,949] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id kafka-python-2.0.1-6c853061-c4e8-4066-8940-2c7173709a96] in group group3 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:41:26,952] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 372 (__consumer_offsets-38) (reason: removing member kafka-python-2.0.1-6c853061-c4e8-4066-8940-2c7173709a96 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:41:26,953] INFO [GroupCoordinator 1001]: Group group3 with generation 373 is now empty (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:44:00,552] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 11:49:45,548] INFO [GroupCoordinator 1001]: Dynamic Member with unknown member id joins group group3 in Empty state. Created a new member id kafka-python-2.0.1-834458a8-67f7-4a0d-9056-52b6b61e79db for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:49:45,576] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 373 (__consumer_offsets-38) (reason: Adding new member kafka-python-2.0.1-834458a8-67f7-4a0d-9056-52b6b61e79db with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:49:45,580] INFO [GroupCoordinator 1001]: Stabilized group group3 generation 374 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:49:45,586] INFO [GroupCoordinator 1001]: Assignment received from leader for group group3 for generation 374 (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:54:00,552] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 11:54:45,676] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id kafka-python-2.0.1-834458a8-67f7-4a0d-9056-52b6b61e79db] in group group3 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:54:45,685] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 374 (__consumer_offsets-38) (reason: removing member kafka-python-2.0.1-834458a8-67f7-4a0d-9056-52b6b61e79db on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:54:45,686] INFO [GroupCoordinator 1001]: Group group3 with generation 375 is now empty (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:59:01,403] INFO [GroupCoordinator 1001]: Dynamic Member with unknown member id joins group group3 in Empty state. Created a new member id kafka-python-2.0.1-f8f4d080-73df-43bd-b49b-5b5403d15e00 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:59:01,432] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 375 (__consumer_offsets-38) (reason: Adding new member kafka-python-2.0.1-f8f4d080-73df-43bd-b49b-5b5403d15e00 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:59:01,471] INFO [GroupCoordinator 1001]: Stabilized group group3 generation 376 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:59:01,481] INFO [GroupCoordinator 1001]: Assignment received from leader for group group3 for generation 376 (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:04:00,552] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 12:04:01,515] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id kafka-python-2.0.1-f8f4d080-73df-43bd-b49b-5b5403d15e00] in group group3 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:04:01,518] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 376 (__consumer_offsets-38) (reason: removing member kafka-python-2.0.1-f8f4d080-73df-43bd-b49b-5b5403d15e00 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:04:01,519] INFO [GroupCoordinator 1001]: Group group3 with generation 377 is now empty (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:14:00,552] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 12:21:11,498] INFO [GroupCoordinator 1001]: Dynamic Member with unknown member id joins group group3 in Empty state. Created a new member id kafka-python-2.0.1-43259283-cdeb-4738-aaa4-44e83542c7dc for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:21:11,502] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 377 (__consumer_offsets-38) (reason: Adding new member kafka-python-2.0.1-43259283-cdeb-4738-aaa4-44e83542c7dc with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:21:11,505] INFO [GroupCoordinator 1001]: Stabilized group group3 generation 378 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:21:11,508] INFO [GroupCoordinator 1001]: Assignment received from leader for group group3 for generation 378 (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:24:00,554] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 2 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 12:26:11,680] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id kafka-python-2.0.1-43259283-cdeb-4738-aaa4-44e83542c7dc] in group group3 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:26:11,811] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 378 (__consumer_offsets-38) (reason: removing member kafka-python-2.0.1-43259283-cdeb-4738-aaa4-44e83542c7dc on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:26:11,812] INFO [GroupCoordinator 1001]: Group group3 with generation 379 is now empty (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:34:00,553] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 12:34:30,547] INFO [GroupCoordinator 1001]: Dynamic Member with unknown member id joins group group3 in Empty state. Created a new member id kafka-python-2.0.1-0775e634-fe08-40e9-9ae3-7080ec21dd8a for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:34:30,597] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 379 (__consumer_offsets-38) (reason: Adding new member kafka-python-2.0.1-0775e634-fe08-40e9-9ae3-7080ec21dd8a with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:34:30,613] INFO [GroupCoordinator 1001]: Stabilized group group3 generation 380 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:34:30,628] INFO [GroupCoordinator 1001]: Assignment received from leader for group group3 for generation 380 (kafka.coordinator.group.GroupCoordinator)

Example 2 (loop of generation WITH reset to 0) of Kafka logs :

...
[2020-07-15 12:10:02,637] INFO [GroupMetadataManager brokerId=1001] Group group3 transitioned to Dead in generation 2 (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 12:10:02,637] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 12:19:22,210] INFO [GroupCoordinator 1001]: Dynamic Member with unknown member id joins group group3 in Empty state. Created a new member id kafka-python-2.0.1-4ecb3653-0db8-4233-9842-710db5270491 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:19:22,211] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 0 (__consumer_offsets-38) (reason: Adding new member kafka-python-2.0.1-4ecb3653-0db8-4233-9842-710db5270491 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:19:22,212] INFO [GroupCoordinator 1001]: Stabilized group group3 generation 1 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:19:22,214] INFO [GroupCoordinator 1001]: Assignment received from leader for group group3 for generation 1 (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:20:02,637] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 12:24:22,271] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id kafka-python-2.0.1-4ecb3653-0db8-4233-9842-710db5270491] in group group3 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:24:22,272] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 1 (__consumer_offsets-38) (reason: removing member kafka-python-2.0.1-4ecb3653-0db8-4233-9842-710db5270491 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:24:22,272] INFO [GroupCoordinator 1001]: Group group3 with generation 2 is now empty (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:25:29,398] INFO [GroupCoordinator 1001]: Dynamic Member with unknown member id joins group group3 in Empty state. Created a new member id kafka-python-2.0.1-d927d9d4-1885-4430-a84b-4cc92cb7a04e for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:25:29,398] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 2 (__consumer_offsets-38) (reason: Adding new member kafka-python-2.0.1-d927d9d4-1885-4430-a84b-4cc92cb7a04e with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:25:29,399] INFO [GroupCoordinator 1001]: Stabilized group group3 generation 3 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:25:29,401] INFO [GroupCoordinator 1001]: Assignment received from leader for group group3 for generation 3 (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:30:02,637] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 12:30:29,420] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id kafka-python-2.0.1-d927d9d4-1885-4430-a84b-4cc92cb7a04e] in group group3 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:30:29,420] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 3 (__consumer_offsets-38) (reason: removing member kafka-python-2.0.1-d927d9d4-1885-4430-a84b-4cc92cb7a04e on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:30:29,421] INFO [GroupCoordinator 1001]: Group group3 with generation 4 is now empty (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)

EDIT2 : add consumer's part code

self.consumer = KafkaConsumer('topic1',
                    bootstrap_servers=brokers_env_var,
                    auto_offset_reset='earliest',
                    enable_auto_commit=True,
                    session_timeout_ms=14000,
                    heartbeat_interval_ms=2000,
                    group_id='group3',
                    max_poll_records=100,
                    key_deserializer=lambda x: loads(x.decode('utf-8')),
                    value_deserializer=lambda x: loads(x.decode('utf-8')))

try:
  for message in self.consumer:
      self.consumeMessage(message.value)

except Exception as error:
  print('Run error - error : ' + str(error))

Upvotes: 3

Views: 13845

Answers (1)

Ofek Hod
Ofek Hod

Reputation: 4024

Consumer group only consumes messages from Kafka, so you actually have one producer which sends messages to topic1, topic3 and two consumer groups- one consumes from topic1 and second from topic3.

Sending the same data to multiple topics doesn't make sense, I suggest you send these messages only to topic1, and use two consumer groups:

  1. 3 consumers from topic1.
  2. Another 3 consumers from topic1.

Different consumer groups in Kafka manage offsets independently, this is why you can use a single topic for your data and create several consumer groups to use it, otherwise two topics with the same data will take double storage space and may have CPU overhead for managing them.

Please make sure that consumers in the same group have the same group.id parameter.
For example, 6 consumers- 3 of them have the group id g1 and the other 3 with group id g2.

Upvotes: 1

Related Questions