StormsEdge
StormsEdge

Reputation: 885

Properly Seek and Consume Kafka Messages on Multipartition Topic

I recently found that a topic i've been using is multi-partition rather than single partition. I need to reconfigure my consumer class to handle the multiple partitions, but i'm a touch confused. I am currently using an offset group, let's call it test_offset_group for sake of the below example. Under normal circumstances, I will always be parsing linearly and continuing forward in time; as messages get added to the topic I will parse them and move on, but in the event of a crash or the need to go back and re-run the feed for the previous day, I need to be able to seek by timestamp. Kafka is mandatory in this project so I have no ability to change the type of streaming data service i'm using.

I configure my consumer like this:

test_consumer = KafkaConsumer("test_topic", bootstrap_servers="bootstrap_string", enable_auto_commit=False, group_id="test_offset_group"

In the event I need to seek to a timestamp, i'll provide a timestamp and then seek using the following method:

test_consumer.poll()

tp = TopicPartition("test_topic", 0)

needed_date = datetime.timestamp(timestamp)

rec_in = test_consumer.offsets_for_times({tp: needed_date * 1000})

test_consumer.seek(tp, rec_in[tp].offset)

The above functions perfectly for a single partition consumer, but this feels very clunky and difficult when you consider numerous partitions. I guess I could fetch the number of partitions using test_consumer.partitions_for_topic('test_topic") and then iterate over each of them, but again, that seems like i'm going against the grain of Kafka and I feel like there should be an easier way to do this.

In summary: I'd like to understand how to seek to a number of offsets with multiple partitions utilizing the offset_group functionality and i'd like to confirm that, by conducting the above operation, I am effectively ignoring all partitions aside from 0?

Upvotes: 0

Views: 2995

Answers (1)

Mickael Maison
Mickael Maison

Reputation: 26885

You are doing the right logic, you just need to perform it on all partitions asigned to this consumer instance.

You can retrieve the current assignment using assignment().

Upvotes: 1

Related Questions