Reputation: 3723
How to consume a Kafka message, without auto commit, process it for a long time (4-60 mins), and commit it without suffering a rebalance, and partition reassignment or blocking other group consumers from consuming other messages.
I’m using a Python 3.8 Kafka consumer, to:
My problem is that often the Kafka partitions get rebalanced to another consumer group member.
After pouring over the documentation I tried and playing with these configuration properties:
max_poll_interval_ms
from kafka import KafkaConsumer, OffsetAndMetadata, TopicPartition
def consume_one_message_at_a_time(conf):
conf.models_dir = f'{conf.project_root}/{conf.models_dir}'
group_id = conf.group_id
group_conf = conf.consumer_groups[group_id]
kafka_brokers = conf.KAFKA_BROKERS
topic = group_conf.subscribe[0].name
print(f'KAFKA_BROKERS: {kafka_brokers}\n Topic {topic}\n group id: {group_id}')
consumer = KafkaConsumer(
topic,
bootstrap_servers=kafka_brokers,
group_id=group_id,
enable_auto_commit=False,
max_poll_records=1,
max_poll_interval_ms=1800000,
# session_timeout_ms=1800000,
# request_timeout_ms=1800002,
# connections_max_idle_ms=1800003
# heartbeat_interval_ms=1800000,
)
print(f'bootstrap_servers: {kafka_brokers} subscribing to {topic}')
consumer.subscribe([topic])
for message in consumer:
print(f"message is of type: {type(message)}")
if not group_conf.use_cmd:
do_something_time_consuming(message)
else:
if group_id == 'bots' and check_bot_id(message):
bot_action(conf, group_conf, message)
else:
print(f'no action for group_id: {group_id}')
print(f'key : {message.key}')
print(f'value: {message.value}')
meta = consumer.partitions_for_topic(message.topic)
partition = TopicPartition(message.topic, message.partition)
offsets = OffsetAndMetadata(message.offset + 1, meta)
options = {partition: offsets}
print(f'\noptions: {options}\n')
response = consumer.commit(offsets=options)
When other group members subscribe or finish their jobs and consume I get this error:
Traceback (most recent call last):
File "./consumer_one_at_a_time.py", line 148, in <module>
consume_one_message_at_a_time(_conf)
File "./consumer_one_at_a_time.py", line 141, in consume_one_message_at_a_time
response = consumer.commit(offsets=options)
File "/usr/lib/python3.8/site-packages/kafka/consumer/group.py", line 526, in commit
self._coordinator.commit_offsets_sync(offsets)
File "/usr/lib/python3.8/site-packages/kafka/coordinator/consumer.py", line 518, in commit_offsets_sync
raise future.exception # pylint: disable-msg=raising-bad-type
kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll()
was longer than the configured max_poll_interval_ms, which
typically implies that the poll loop is spending too much
time message processing. You can address this either by
increasing the rebalance timeout with max_poll_interval_ms,
or by reducing the maximum size of batches returned in poll()
with max_poll_records.
After Adding these configurations I found that new consumers are blocked! I.e. do not consume messages, until one is committed!
session_timeout_ms=1800000,
request_timeout_ms=1800002,
connections_max_idle_ms=1800003
# heartbeat_interval_ms=1800000,
I read that a background thread is supposed to send a heartbeat. Is there a way to send a heartbeat without polling?
Upvotes: 4
Views: 9232
Reputation: 3559
Is there a way to send a heartbeat without polling?
It already works like this. Heartbeat is sent via seperate thread in Kafka since version 0.10.1.0. (you can check this for more information)
In general rebalance happens in these situations:
It seems that your situation is the last one. You poll records but don't poll again in max.poll.interval.ms
(30 minutes in your case) because of long running process. To solve this problem:
max.poll.interval.ms
. But it can lead too long rebalances. Because rebalance.timeout = max.poll.interval.ms
. After rebalance starts all the consumer in the consumer group is revoked and Kafka waits all the consumers which is still sending heartbeat to poll() (by polling consumers send joinGroupRequest at that point) until rebalance timeout expires which is equal to max.poll.interval.ms
. Let's say you set max.poll.interval.ms
to 60 minutes and your process takes 50 minutes to finish. If rebalance start because any of the reasons I mentioned above in the fifth minute of your long process, then Kafka will wait your consumer to poll for 45 minutes. During this period of time all the consumers will be revoked. (consuming will be totally stopped for this consumer group) So it's not a good idea IMHO. (of course it depends on your needs)Upvotes: 6