Josh Rickard
Josh Rickard

Reputation: 1603

Using the bunny gem, how do I block until either a message is received or a timeout expires

I'm using the bunny ruby gem to send and receive messages to a rabbitmq server. How do I synchronously pop a message off the queue while putting a timeout on how long I wait (i.e. if no message arrives after 3 seconds, stop blocking)?

One obvious solution would be to just loop over pop calls until either the timeout has expired or a message is received, but that seems very inefficient. Is there a more elegant solution? I took a look at the documentation for bunny as well as the tutorials on the rabbitmq site but I'm not finding a solution for this particular scenario.

Upvotes: 2

Views: 1818

Answers (3)

Chris
Chris

Reputation: 3050

As a slight variation on the above code by @Ilya: https://stackoverflow.com/a/35126963/448858 I found that I had to create a thread to timeout and then shutdown the channel's work pool

module Bunny
  class Queue
    def subscribe(opts = { block: false, timeout: 1000 }, &block)
      ctag = opts.fetch(:consumer_tag, @channel.generate_consumer_tag)
      consumer = Consumer.new(@channel, self, ctag)
      consumer.on_delivery(&block)
      @channel.basic_consume_with(consumer)
      if opts[:block]
        Thread.new do
          sleep(opts[:timeout]/1000.0)
          @channel.work_pool.shutdown
        end
        @channel.work_pool.join
      end
    end
  end
end

Upvotes: 0

CppNoob
CppNoob

Reputation: 2400

I did not find a way to do it easily using Bunny, and what I propose here blocks without a timeout. But it does support retrieving one message per call semantics. Given that Bunny internally uses a thread-pool to receive messages, I figured that a simpler way might be to use a blocking queue such as Ruby's Queue class for transporting the message from Bunny's thread-pool to the calling thread. Something like the following:

# Set up your internal queue somewhere (in your class's initialize maybe?)
@internal_queue = Queue.new

# In the main thread that needs to block
...
# the call to subscribe is non-blocking
queue.subscribe do |delivery_info, properties, payload|
  @internal_queue.enq(payload)  # this runs inside Bunny's pool
end

# the call to deq is blocking
response = @internal_queue.deq  # this blocks the main thread till a
                                # message is pushed to the internal_q

You could maintain one @internal_queue for each AMQP channel you need to listen on. You could factor these parts out into separate methods and make a neat blocking API that returns you one message at a time.

I later created a TimedWaitableQueue class wrapping a simple Array extended with monitor MonitorMixin, and then using mutex + condition variable semantics. That allowed blocking on a dequeue call with timeouts.

Upvotes: 1

Ilya Zakharov
Ilya Zakharov

Reputation: 21

To make such feature a was forced to rewrite basic method subscribe. I figured out that we can set timeout time for a channel, but there wasn't such input parameter in a function.

response = nil

subscribe(block: true, timeout: 10) do |delivery_info, properties, payload|
  Rails.logger.info "got message #{payload}"
  response = payload
  @channel.consumers[delivery_info.consumer_tag].cancel
end



def subscribe(opts = {block: false}, &block)
    ctag = opts.fetch(:consumer_tag, @channel.generate_consumer_tag)
    consumer = Bunny::Consumer.new(@channel,@response_queue,ctag)

    consumer.on_delivery(&block)
    @channel.basic_consume_with(consumer)

    if opts[:block]
      @channel.work_pool.join(opts[:timeout])
    end
end

Upvotes: 2

Related Questions