Reputation: 1603
I'm using the bunny ruby gem to send and receive messages to a rabbitmq server. How do I synchronously pop a message off the queue while putting a timeout on how long I wait (i.e. if no message arrives after 3 seconds, stop blocking)?
One obvious solution would be to just loop over pop calls until either the timeout has expired or a message is received, but that seems very inefficient. Is there a more elegant solution? I took a look at the documentation for bunny as well as the tutorials on the rabbitmq site but I'm not finding a solution for this particular scenario.
Upvotes: 2
Views: 1818
Reputation: 3050
As a slight variation on the above code by @Ilya: https://stackoverflow.com/a/35126963/448858 I found that I had to create a thread to timeout and then shutdown the channel's work pool
module Bunny
class Queue
def subscribe(opts = { block: false, timeout: 1000 }, &block)
ctag = opts.fetch(:consumer_tag, @channel.generate_consumer_tag)
consumer = Consumer.new(@channel, self, ctag)
consumer.on_delivery(&block)
@channel.basic_consume_with(consumer)
if opts[:block]
Thread.new do
sleep(opts[:timeout]/1000.0)
@channel.work_pool.shutdown
end
@channel.work_pool.join
end
end
end
end
Upvotes: 0
Reputation: 2400
I did not find a way to do it easily using Bunny, and what I propose here blocks without a timeout. But it does support retrieving one message per call semantics. Given that Bunny internally uses a thread-pool to receive messages, I figured that a simpler way might be to use a blocking queue such as Ruby's Queue
class for transporting the message from Bunny's thread-pool to the calling thread. Something like the following:
# Set up your internal queue somewhere (in your class's initialize maybe?)
@internal_queue = Queue.new
# In the main thread that needs to block
...
# the call to subscribe is non-blocking
queue.subscribe do |delivery_info, properties, payload|
@internal_queue.enq(payload) # this runs inside Bunny's pool
end
# the call to deq is blocking
response = @internal_queue.deq # this blocks the main thread till a
# message is pushed to the internal_q
You could maintain one @internal_queue for each AMQP channel you need to listen on. You could factor these parts out into separate methods and make a neat blocking API that returns you one message at a time.
I later created a TimedWaitableQueue class wrapping a simple Array extended with monitor MonitorMixin, and then using mutex + condition variable semantics. That allowed blocking on a dequeue call with timeouts.
Upvotes: 1
Reputation: 21
To make such feature a was forced to rewrite basic method subscribe. I figured out that we can set timeout time for a channel, but there wasn't such input parameter in a function.
response = nil
subscribe(block: true, timeout: 10) do |delivery_info, properties, payload|
Rails.logger.info "got message #{payload}"
response = payload
@channel.consumers[delivery_info.consumer_tag].cancel
end
def subscribe(opts = {block: false}, &block)
ctag = opts.fetch(:consumer_tag, @channel.generate_consumer_tag)
consumer = Bunny::Consumer.new(@channel,@response_queue,ctag)
consumer.on_delivery(&block)
@channel.basic_consume_with(consumer)
if opts[:block]
@channel.work_pool.join(opts[:timeout])
end
end
Upvotes: 2