Reputation: 21476
Here's what I've tried so far:
from confluent_kafka import Consumer
c = Consumer({... several security/server settings skipped...
'auto.offset.reset': 'beginning',
'group.id': 'my-group'})
c.subscribe(['my.topic'])
msg = poll(30.0) # msg is of None type.
msg
almost always ends up being None
though. I think the issue might be that 'my-group'
has already consumed all the messages for 'my.topic'
... but I don't care whether a message has already been consumed or not - I still need the latest message. Specifically, I need the timestamp from that latest message.
I tried a bit more, and from this it looks like there are probably 25 messages in the topic, but I have no idea how to get at them:
a = c.assignment()
print(a) # Outputs [TopicPartition{topic=my.topic,partition=0,offset=-1001,error=None}]
offsets = c.get_watermark_offsets(a[0])
print(offsets) # Outputs: (25, 25)
If there are no messages because the topic has never had anything written to it at all, how can I determine that? And if that's the case, how can I determine how long the topic has existed for? I'm looking to write a script that automatically deletes any topics that haven't been written to in the past X days (14 initially - will probably tweak it over time.)
Upvotes: 6
Views: 8039
Reputation: 31
If anyone still needs an example for case with multiple partitions; this is how I did it:
from confluent_kafka import OFFSET_END, Consumer
settings = {
'bootstrap.servers': "my.kafka.server",
'group.id': "my-work-group",
'auto.offset.reset': "latest"
}
def on_assign(consumer, partitions):
for partition in partitions:
partition.offset = OFFSET_END
consumer.assign(partitions)
consumer = Consumer(settings)
consumer.subscribe(["test-topic"], on_assign=on_assign)
msg = consumer.poll(1.0)
Upvotes: 3
Reputation: 101
I run into the same issue, and no example on this. In my case there is one partition, and I need to read the last message, to know the some info from that message to setup the consumer/producer component I have.
Logic is that start Consumer
, subscribe to topic, poll for message -> this triggers on_assign
, where the rewinding happens, by assigning the modified partitions back. After on_assign
finishes, the poll for msg
continues and reads the last message from topic.
settings = {
"bootstrap.servers": "my.kafka.server",
"group.id": "my-work-group",
"client.id": "my-work-client-1",
"enable.auto.commit": False,
"session.timeout.ms": 6000,
"default.topic.config": {"auto.offset.reset": "largest"},
}
consumer = Consumer(settings)
def on_assign(a_consumer, partitions):
# get offset tuple from the first partition
last_offset = a_consumer.get_watermark_offsets(partitions[0])
# position [1] being the last index
partitions[0].offset = last_offset[1] - 1
consumer.assign(partitions)
consumer.subscribe(["test-topic"], on_assign=on_assign)
msg = consumer.poll(6.0)
Now msg
is having the last message inside.
Upvotes: 9