Reputation: 885
I recently found that a topic i've been using is multi-partition rather than single partition. I need to reconfigure my consumer class to handle the multiple partitions, but i'm a touch confused. I am currently using an offset group, let's call it test_offset_group
for sake of the below example. Under normal circumstances, I will always be parsing linearly and continuing forward in time; as messages get added to the topic I will parse them and move on, but in the event of a crash or the need to go back and re-run the feed for the previous day, I need to be able to seek by timestamp. Kafka
is mandatory in this project so I have no ability to change the type of streaming data service i'm using.
I configure my consumer like this:
test_consumer = KafkaConsumer("test_topic", bootstrap_servers="bootstrap_string", enable_auto_commit=False, group_id="test_offset_group"
In the event I need to seek to a timestamp, i'll provide a timestamp and then seek using the following method:
test_consumer.poll()
tp = TopicPartition("test_topic", 0)
needed_date = datetime.timestamp(timestamp)
rec_in = test_consumer.offsets_for_times({tp: needed_date * 1000})
test_consumer.seek(tp, rec_in[tp].offset)
The above functions perfectly for a single partition consumer, but this feels very clunky and difficult when you consider numerous partitions. I guess I could fetch the number of partitions using
test_consumer.partitions_for_topic('test_topic")
and then iterate over each of them, but again, that seems like i'm going against the grain of Kafka and I feel like there should be an easier way to do this.
In summary: I'd like to understand how to seek to a number of offsets with multiple partitions utilizing the offset_group functionality and i'd like to confirm that, by conducting the above operation, I am effectively ignoring all partitions aside from 0?
Upvotes: 0
Views: 2995
Reputation: 26885
You are doing the right logic, you just need to perform it on all partitions asigned to this consumer instance.
You can retrieve the current assignment using assignment()
.
Upvotes: 1