Reputation: 11697
In java RabbitMQ client I can do (code in ruby):
consumer = QueueingConsumer.new(channel);
channel.basicConsume(queue_name, true, consumer);
consumer.nextDelivery.getBody
And then third line blocks thread until message comes. But how to achieve it in Bunny client? I can only use block:
channel.queue('').bind(@x, :routing_key => rk).subscribe(block: true) do |_, _, payload|
# do something
end
or non blocking pop:
delivery_info, properties, payload = q.pop
Is there a way to achieve it like in jruby client using Bunny? The reason I want is that after receiving a message I would like to continue job in my current context.
Upvotes: 1
Views: 3048
Reputation: 16525
I needed to receive a single message from a queue. Bunny's Queue#pop
is non-blocking and does not have an option to wait. I also needed to support a timeout, and I ended up implementing this:
require "thread"
mutex = Mutex.new
var = ConditionVariable.new
payload = nil
consumer = queue.subscribe(block: false) do |_, _, x_payload|
mutex.synchronize do
payload = x_payload
var.signal
end
end
mutex.synchronize do
deadline = Time.now + 10
while payload.nil? && (remaining = deadline - Time.now) > 0
var.wait(mutex, remaining)
end
end
consumer.cancel
raise "timed out waiting for response" if payload.blank?
Inspired in part by https://spin.atomicobject.com/2017/06/28/queue-pop-with-timeout-fixed/
This code has not been battle-tested. It works on staging with a single message. It may not work at scale. I was going to hide all of this complexity in a monkey-patch to Bunny::Queue
. Callers would have seen a simple blocking_pop(timeout:)
API.
Upvotes: 0
Reputation: 1837
Rob Harrop's answer does cancel the queue, but it didn't end the block for me. The following does both using a ruby Queue
require "thread"
unblock = Queue.new # Ruby Queue, not Bunny::Queue
queue = channel.queue('').bind(@x, :routing_key => rk)
consumer = queue.subscribe do |delivery_info, properties, body|
# do something
result = determine_if_it_is_time_to_move_on
unblock.enq true if result
end
unblock.deq # block until a message is enqueued in the ruby Queue
consumer.cancel
Upvotes: 1
Reputation: 3473
The call to subscribe
is blocking due to passing :block => true
. If you need to access the payload outside of the block, you can take advantage of Ruby's scoping rules:
the_payload = nil
queue = channel.queue('').bind(@x, :routing_key => rk)
queue.subscribe(block: true) do |delivery_info, _, payload|
the_payload = payload
channel.consumers[delivery_info.consumer_tag].cancel
end
# the_payload is now the one received in the block!
Upvotes: 2