Reputation: 11032
I have a Python process (or rather, set of processes running in parallel within a consumer group) that processes data according to inputs coming in as Kafka messages from certain topic. Usually each message is processed quickly, but sometimes, depending on the content of the message, it may take a long time (several minutes). In this case, Kafka broker disconnects the client from the group and initiates the rebalance. I could set session_timeout_ms
to a really large value but it would be like 10 minutes of more, which means if a client dies, the cluster would not be properly rebalanced for 10 minutes. This seems to be a bad idea. Also, most messages (about 98% of them) are fast, so paying such penalty for just 1-2% of messages seems wasteful. OTOH, large messages are frequent enough to cause a lot of rebalances and cost a lot of performance (since while the group is rebalancing, nothing is getting done, and then the "dead" client re-joins again and causes another rebalance).
So, I wonder, are there any other ways for handling messages that take a long time to process? Is there any way to initiate heartbeats manually to tell the broker "it's ok, I am alive, I'm just working on the message"? I thought the Python client (I use kafka-python 1.4.7
) was supposed to do that for me but it doesn't seem to happen. Also, the API doesn't seem to even have separate "heartbeat" function at all. And as I understand, calling poll()
would actually get me the next messages - while I am not even done with the current one, and would also mess up iterator API for Kafka consumer, which is quite convenient to use in Python.
In case it's important, the Kafka cluster is Confluent, version 2.3 if I remember correctly.
Upvotes: 2
Views: 11283
Reputation: 3832
In Kafka, 0.10.1+ Kafka polling and session heartbeat are decoupled to each other. You can get an explanationhere
max.poll.interval.ms how much time permit to complete processing by consumer instance before time out means if processing time takes more than max.poll.interval.ms time Consumer Group will presume its die remove from Consumer Group and invoke rebalance.
To increase this will increase the interval between expected polls which give consumers more time to handle a batch of records returned from poll(long). But at the same time, it will also delay group rebalances since the consumer will only join the rebalance inside the call to poll.
session.timeout.ms is the timeout used to identify if the consumer is still alive and sending a heartbeat on a defined interval (heartbeat.interval.ms). In general, the thumb-rule is heartbeat.interval.ms should be 1/3 of session timeout so in case of network failure consumers can miss at most 3-time heartbeat before session timeout.
session.timeout.ms: low value would be good to detect failure more quickly.
max.poll.interval.ms: large value will reduce the risk of failure due to increased processing time however increases the rebalancing time.
Note: A large number of partition and topics consumed by Consumer Group also effect on overall rebalance time
The other approach if you would really want to get rid of rebalancing you can assign partitions on each consumer instance manually, using partition assign. In that case, each consumer instance will be running independently with their own assigned partitions. But in that case, you would not able to leverage the rebalance features to assign partitions automatically.
Upvotes: 7