Rubber Duck
Rubber Duck

Reputation: 3723

How to process a Kafka message for a long time (4-60 mins), without auto commit, and commit it without suffering a rebalance

How to consume a Kafka message, without auto commit, process it for a long time (4-60 mins), and commit it without suffering a rebalance, and partition reassignment or blocking other group consumers from consuming other messages.

I’m using a Python 3.8 Kafka consumer, to:

My problem is that often the Kafka partitions get rebalanced to another consumer group member.

After pouring over the documentation I tried and playing with these configuration properties:

When other group members subscribe or finish their jobs and consume I get this error:

    Traceback (most recent call last):
  File "./consumer_one_at_a_time.py", line 148, in <module>
    consume_one_message_at_a_time(_conf)
  File "./consumer_one_at_a_time.py", line 141, in consume_one_message_at_a_time
    response = consumer.commit(offsets=options)
  File "/usr/lib/python3.8/site-packages/kafka/consumer/group.py", line 526, in commit
    self._coordinator.commit_offsets_sync(offsets)
  File "/usr/lib/python3.8/site-packages/kafka/coordinator/consumer.py", line 518, in commit_offsets_sync
    raise future.exception # pylint: disable-msg=raising-bad-type
kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
            rebalanced and assigned the partitions to another member.
            This means that the time between subsequent calls to poll()
            was longer than the configured max_poll_interval_ms, which
            typically implies that the poll loop is spending too much
            time message processing. You can address this either by
            increasing the rebalance timeout with max_poll_interval_ms,
            or by reducing the maximum size of batches returned in poll()
            with max_poll_records.

After Adding these configurations I found that new consumers are blocked! I.e. do not consume messages, until one is committed!

session_timeout_ms=1800000,
request_timeout_ms=1800002,
connections_max_idle_ms=1800003
# heartbeat_interval_ms=1800000,

I read that a background thread is supposed to send a heartbeat. Is there a way to send a heartbeat without polling?

Upvotes: 4

Views: 9232

Answers (1)

H.&#199;.T
H.&#199;.T

Reputation: 3559

Is there a way to send a heartbeat without polling?

It already works like this. Heartbeat is sent via seperate thread in Kafka since version 0.10.1.0. (you can check this for more information)

In general rebalance happens in these situations:

  • new consumer joins consumer group
  • Adding new partitions
  • Clean shut-down of a consumer
  • A consumer is considered as dead by Kafka
    • Expiring session.timeout.ms without sending heartbeat
    • Expiring max.poll.timeout.ms without sending poll request

It seems that your situation is the last one. You poll records but don't poll again in max.poll.interval.ms (30 minutes in your case) because of long running process. To solve this problem:

  • You can increase max.poll.interval.ms. But it can lead too long rebalances. Because rebalance.timeout = max.poll.interval.ms. After rebalance starts all the consumer in the consumer group is revoked and Kafka waits all the consumers which is still sending heartbeat to poll() (by polling consumers send joinGroupRequest at that point) until rebalance timeout expires which is equal to max.poll.interval.ms. Let's say you set max.poll.interval.ms to 60 minutes and your process takes 50 minutes to finish. If rebalance start because any of the reasons I mentioned above in the fifth minute of your long process, then Kafka will wait your consumer to poll for 45 minutes. During this period of time all the consumers will be revoked. (consuming will be totally stopped for this consumer group) So it's not a good idea IMHO. (of course it depends on your needs)
  • Another solution is not using Kafka for this kind of long running operations. Because Kafka is not suitable for long running processing. You can persist metadata about long processes as a part of message consuming with Kafka and then make the appropiate operation without using Kafka.

Upvotes: 6

Related Questions