Marina
Marina

Reputation: 4064

How to specify starting position for Kafka consumers when using dynamic group re-balancing?

Is it possible to start 0.9 or 0.10 Kafka consumers from a specified offset , while still using consumer groups with dynamic re-balancing?

Here is what have found so far:

Case 1: If we use consumer.assign(…) method to manually assign partitions to consumers - we can do all below actions:

consumer.seek(<specificPartition>, <myCustomOffset>); or:
consumer.seekToBeginning(<specificPartition>);
consumer.seekToEnd(<specificPartition>);

Basically, we have full control over which position to start the consumer form, BUT it is at the expense of not having the partition re-assignment done dynamically by Kafka

Case 2: If we use consumer.subscribe(…) method - Kafka will manage the re-balancing, however, we cannot do any of the three options above … :( So, we tried the following to “hack” around it - at the consumer start up time, before entering the poll() loop:

// get coordinator from the private field of the consumer:
ConsumerCoordinator coordinator = (ConsumerCoordinator) FieldUtils.readField(consumer, "coordinator", true);
// make sure all partitions are already 
coordinator.ensurePartitionAssignment();
// get the list of partitions assigned to this specific consumer:
Set<TopicPartition> assignedTopicPartitions = consumer.assignment()
// now we can go ahead and do the same three actions (seek(), sequined() or seekToBeginning()) on those partitions only for this consumer as above.
for (TopicPartition assignedPartition: assignedTopicPartitions) {
     consumer.seek(<assignedPartition>, <myCustomOffset>) // or whatever
...
}
// now start the poll() loop:
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(pollIntervalMs);
    for (ConsumerRecord<String, String> record : records) {
         // processMessage(record.value(), record.offset());
    }
}

This feels too hack-y for my taste, and, also, I am not sure if this logic will hold during the actual re-balancing , when, say, new consumers are added to the group.

Could somebody validate this approach or suggest a better way to accomplish what we need ?

thanks!

Upvotes: 4

Views: 10712

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62330

Instead of using ConsumerCoordinator you can just do an initial poll() (and do not process anything) to get partitions assigned. Afterwards, use seek() and start you poll-loop as shown in your code.

Upvotes: 3

Related Questions