SherwoodCH
SherwoodCH

Reputation: 23

Receive from specific offset with Qpid Proton + RabbitMQ Stream Queue (Core)

I want to implement a python based receiver app with Qpid proton library to consume messages from a RabbitMQ Stream Queue over AMQP 1.0. For a proper implementation I need to track the offset of consumed messages. When restarting the receiver I want to continue receiving from the last consumed offset instead of the lastest messages.

How can I pass the offset number to the proton receiver so that it only reads from that offset? I don't find related documentation about any approach achieving this.

with the following attempts, no offset filtering was applied: receiver.source.filter.put_dict({'x-stream-offset': ulong(1000)'} receiver.source.filter.put_dict({'rabbitmq:stream-offset-spec': ulong(1000)'} event.container.create_receiver(conn, "examples", options=Selector(u"x-stream-offset = '1000'"))

Upvotes: 0

Views: 190

Answers (1)

SherwoodCH
SherwoodCH

Reputation: 23

The solution is to import symbol and use symbol("rabbitmq:stream-offset-spec"):

sample extract:

from proton import symbol
from proton.reactor import Filter

offset = 123

offset_filter = Filter(filter_set={symbol("rabbitmq:stream-offset-spec"): ulong(int(offset))})
receiver = conn.create_receiver(address=self.address, options=offset_filter)

Upvotes: 0

Related Questions