daydr3amer
daydr3amer

Reputation: 326

Set offset for an unconnected kafka topic in a consumer group without stopping consumers

We have a single consumer group set up for the continuous migration multiple topics. There is a service running, which connects a new consumer instance to each topic. We have the ability to disable the consumers for single topics while keeping the others running.

Sometimes we have to reset the offset for a specific topic, so that the migration starts over. Is there a way to override the necessity to disable all of the consumers to do so? As the offset is being kept per topic basis in a consumer group, I don't see why we have to disable the connections to all of the other topics. If this is not possible, what is the benefit of reading multiple topics with a single consumer group?

Example:

service A -> consumer with group "migration" -> consumes topic A
service A -> consumer with group "migration" -> consumer for topic B is stopped
service A -> consumer with group "migration" -> consumes topic C

set offset for group "migration" for topic B
Error: Assignments can only be reset if the group 'migration' is inactive, but the current state is Stable.

Upvotes: 0

Views: 1168

Answers (1)

ahmed.ettoumi
ahmed.ettoumi

Reputation: 793

Each member of a consumer-group is assigned to topics-partitions, so until the group is UP (continuous poll/commit for assigned partition) you couldn't reset-offsets.

But

There is a solution allow you to do it without service-interruption (stopping consumers)

  • First : you should switch your partition assignor to "CooperativeStickyAssignor"

partition.assignment.strategy: [RangeAssignor,CooperativeStickyAssignor]

  • Restart all of application instances one by one
  • Remove The Range assignor

partition.assignment.strategy: [CooperativeStickyAssignor]

  • Restart all of application instances one by one

After this change, you could now:

  1. take-off the topic (where you'd like to setup a reset-offset) from the consumer topic's list.
  2. Build and redeploy your app => The cooperative assignor will revoke/remove the partitions of this topic from the assignation, without stopping the consume from other topics
  3. Reset your offsets with :

kafka-consumer-group --bootstrap-server xxxx --group GROUP_ID_NAME --topic TOPIC_NAME --reset-offset --to-xxx ... --execute

And then reput your topic on your consumer config then build and redeploy it.

Pay Attention : this feature only available from kafka-client version 2.4

More details here : https://www.confluent.io/fr-fr/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/

Upvotes: 1

Related Questions