Reputation: 4064
Is it possible to start 0.9 or 0.10 Kafka consumers from a specified offset , while still using consumer groups with dynamic re-balancing?
Here is what have found so far:
Case 1: If we use consumer.assign(…) method to manually assign partitions to consumers - we can do all below actions:
consumer.seek(<specificPartition>, <myCustomOffset>); or:
consumer.seekToBeginning(<specificPartition>);
consumer.seekToEnd(<specificPartition>);
Basically, we have full control over which position to start the consumer form, BUT it is at the expense of not having the partition re-assignment done dynamically by Kafka
Case 2: If we use consumer.subscribe(…) method - Kafka will manage the re-balancing, however, we cannot do any of the three options above … :( So, we tried the following to “hack” around it - at the consumer start up time, before entering the poll() loop:
// get coordinator from the private field of the consumer:
ConsumerCoordinator coordinator = (ConsumerCoordinator) FieldUtils.readField(consumer, "coordinator", true);
// make sure all partitions are already
coordinator.ensurePartitionAssignment();
// get the list of partitions assigned to this specific consumer:
Set<TopicPartition> assignedTopicPartitions = consumer.assignment()
// now we can go ahead and do the same three actions (seek(), sequined() or seekToBeginning()) on those partitions only for this consumer as above.
for (TopicPartition assignedPartition: assignedTopicPartitions) {
consumer.seek(<assignedPartition>, <myCustomOffset>) // or whatever
...
}
// now start the poll() loop:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(pollIntervalMs);
for (ConsumerRecord<String, String> record : records) {
// processMessage(record.value(), record.offset());
}
}
This feels too hack-y for my taste, and, also, I am not sure if this logic will hold during the actual re-balancing , when, say, new consumers are added to the group.
Could somebody validate this approach or suggest a better way to accomplish what we need ?
thanks!
Upvotes: 4
Views: 10712
Reputation: 62330
Instead of using ConsumerCoordinator
you can just do an initial poll()
(and do not process anything) to get partitions assigned. Afterwards, use seek()
and start you poll-loop as shown in your code.
Upvotes: 3