Reputation: 23
I want to implement a python based receiver app with Qpid proton library to consume messages from a RabbitMQ Stream Queue over AMQP 1.0. For a proper implementation I need to track the offset of consumed messages. When restarting the receiver I want to continue receiving from the last consumed offset instead of the lastest messages.
How can I pass the offset number to the proton receiver so that it only reads from that offset? I don't find related documentation about any approach achieving this.
with the following attempts, no offset filtering was applied:
receiver.source.filter.put_dict({'x-stream-offset': ulong(1000)'}
receiver.source.filter.put_dict({'rabbitmq:stream-offset-spec': ulong(1000)'}
event.container.create_receiver(conn, "examples", options=Selector(u"x-stream-offset = '1000'"))
Upvotes: 0
Views: 190
Reputation: 23
The solution is to import symbol and use symbol("rabbitmq:stream-offset-spec")
:
sample extract:
from proton import symbol
from proton.reactor import Filter
offset = 123
offset_filter = Filter(filter_set={symbol("rabbitmq:stream-offset-spec"): ulong(int(offset))})
receiver = conn.create_receiver(address=self.address, options=offset_filter)
Upvotes: 0