Sławosz
Sławosz

Reputation: 11697

How to do blocking subscribe using bunny RabbitMQ client?

In java RabbitMQ client I can do (code in ruby):

 consumer = QueueingConsumer.new(channel);
 channel.basicConsume(queue_name, true, consumer);
 consumer.nextDelivery.getBody

And then third line blocks thread until message comes. But how to achieve it in Bunny client? I can only use block:

channel.queue('').bind(@x, :routing_key => rk).subscribe(block: true) do |_, _, payload|
  # do something
end

or non blocking pop:

delivery_info, properties, payload = q.pop

Is there a way to achieve it like in jruby client using Bunny? The reason I want is that after receiving a message I would like to continue job in my current context.

Upvotes: 1

Views: 3048

Answers (3)

François Beausoleil
François Beausoleil

Reputation: 16525

I needed to receive a single message from a queue. Bunny's Queue#pop is non-blocking and does not have an option to wait. I also needed to support a timeout, and I ended up implementing this:

require "thread"

mutex = Mutex.new
var = ConditionVariable.new
payload = nil
consumer = queue.subscribe(block: false) do |_, _, x_payload|
  mutex.synchronize do
    payload = x_payload
    var.signal
  end
end

mutex.synchronize do
  deadline = Time.now + 10
  while payload.nil? && (remaining = deadline - Time.now) > 0
    var.wait(mutex, remaining)
  end
end

consumer.cancel
raise "timed out waiting for response" if payload.blank?

Inspired in part by https://spin.atomicobject.com/2017/06/28/queue-pop-with-timeout-fixed/

This code has not been battle-tested. It works on staging with a single message. It may not work at scale. I was going to hide all of this complexity in a monkey-patch to Bunny::Queue. Callers would have seen a simple blocking_pop(timeout:) API.

Upvotes: 0

Michael Wasser
Michael Wasser

Reputation: 1837

Rob Harrop's answer does cancel the queue, but it didn't end the block for me. The following does both using a ruby Queue

require "thread"

unblock = Queue.new # Ruby Queue, not Bunny::Queue
queue = channel.queue('').bind(@x, :routing_key => rk)
consumer = queue.subscribe do |delivery_info, properties, body|
  # do something
  result = determine_if_it_is_time_to_move_on
  unblock.enq true if result
end

unblock.deq # block until a message is enqueued in the ruby Queue
consumer.cancel

Upvotes: 1

Rob Harrop
Rob Harrop

Reputation: 3473

The call to subscribe is blocking due to passing :block => true. If you need to access the payload outside of the block, you can take advantage of Ruby's scoping rules:

the_payload = nil
queue = channel.queue('').bind(@x, :routing_key => rk)
queue.subscribe(block: true) do |delivery_info, _, payload|
  the_payload = payload
  channel.consumers[delivery_info.consumer_tag].cancel
end
# the_payload is now the one received in the block!

Upvotes: 2

Related Questions