Reputation: 143
I am continuously producing a message to a topic using pykafka
producer.produce('test')
I would like to get the most recent message. I found a solution on the pykafka Github page which suggests:
client = KafkaClient(hosts="xxxxxxx")
topic = client.topics['mytopic']
consumer = topic.get_simple_consumer(
auto_offset_reset=OffsetType.LATEST,
reset_offset_on_start=True)
LAST_N_MESSAGES = 2
offsets = [(p, op.next_offset - LAST_N_MESSAGES) for p, op in consumer._partitions.iteritems()]
consumer.reset_offsets(offsets)
consumer.consume()
However, I don't really understand what is going on here, and it only gets the most recent message if there are at least two messages already there.
Is there a more robust solution?
Upvotes: 1
Views: 4476
Reputation: 6207
It's important to define precisely what you mean by "most recent message". In a Kafka topic with more than one partition, it's actually not possible to know which of the most recent messages on each partition is the globally most recent message without examining the message contents. It's also important to define when you want to get the most recent message(s) - do you want them once, right now? Do you want to start consuming from the most recent message and then continue consuming messages as they're added to the topic? Do you want to periodically get only the newest N messages?
The recipe you included above (the basis of which I wrote for the PyKafka documentation) gives you the last N messages per partition for your choice of N. If you want to get only the last message, you can simply set LAST_N_MESSAGES
to 1. Essentially the recipe examines the latest offset consumed per partition, then resets the consumer's offset to exactly LAST_N_MESSAGES
before that. When you consume from this point, you're getting only the last N messages of the partitions.
All of that said, if you're simply interested in starting to consume from the end of the topic, you can use this:
consumer = topic.get_simple_consumer(
auto_offset_reset=OffsetType.LATEST,
reset_offset_on_start=True)
and start consuming normally.
Upvotes: 4