Jackson Ray Hamilton
Jackson Ray Hamilton

Reputation: 9476

Wait until message producer receives confirmation of work having finished by consumer

Using Ruby's "bunny" RabbitMQ client, I want my producer (some Ruby code) to send a message to a consumer (a worker using the "sneakers" gem), and I want my producer to not execute another line of its Ruby code until the producer receives confirmation that the consumer received my message and did some work with it.

In my consumer, I am doing some work and then calling sneakers' ack! method to acknowledge that the message was received and the work was done.

In my producer, I am calling confirm_select on my Bunny::Channel instance to put it into confirmation code, and after publish-ing my messages, I call wait_for_confirms on the channel to supposedly wait until all my messages have been ack!-ed by the consumer. (I've tried implementing what I found in the bunny docs here.)

However, it seems that my producer is not waiting for the consumer to call ack!. I'm logging in both my producer and my consumer and finding that my producer seems to think messages have been acknowledged before the consumer actually acknowledges them.

How do I make a RabbitMQ producer wait until the consumer has finished its work in Ruby?

Ruby 2.3.3, RabbitMQ 3.6.12, Erlang 17.3.

Here is my lockfile:

GEM
  specs:
    amq-protocol (2.2.0)
    bunny (2.7.0)
      amq-protocol (>= 2.2.0)
    concurrent-ruby (1.0.5)
    serverengine (1.5.11)
      sigdump (~> 0.2.2)
    sigdump (0.2.4)
    sneakers (2.6.0)
      bunny (~> 2.7.0)
      concurrent-ruby (~> 1.0)
      serverengine (~> 1.5.11)
      thor
    thor (0.20.0)

PLATFORMS
  ruby

DEPENDENCIES
  bunny
  sneakers

BUNDLED WITH
   1.14.6

Here is my consumer / worker (consumer_worker.rb):

class ConsumerWorker
  include Sneakers::Worker

  from_queue 'do-work-here',
             exchange: 'do-work-here',
             exchange_type: :direct,
             durable: true,
             prefetch: 1,
             arguments: {
               :'x-dead-letter-exchange' => 'do-work-here-retry'
             },
             timeout_job_after: 5,
             retry_timeout: 60000,
             ack: true

  def work(msg)
    open('ruby-debug.log', 'a') do |f|
      f.puts "message received: #{msg}"
    end
    sleep 1
    open('ruby-debug.log', 'a') do |f|
      f.puts "acknowledging message at: #{Time.now.to_i}"
    end
    ack!
    open('ruby-debug.log', 'a') do |f|
      f.puts "acknowledged message at: #{Time.now.to_i}"
    end
  end
end

In one terminal tab, I am running this worker with:

bundle exec sneakers work ConsumerWorker --require consumer_worker.rb

Here is my publisher (publisher.rb):

require 'bunny'
connection = Bunny.new('amqp://guest:guest@localhost:5672').tap(&:start)
channel = connection.create_channel
channel.confirm_select
queue = channel.queue('do-work-here',
                      {arguments: {:'x-dead-letter-exchange' => 'do-work-here-retry'},
                       durable: true})
queue.publish('hello world', persistent: true)
channel.wait_for_confirms
open('ruby-debug.log', 'a') do |f|
  f.puts "messages confirmed at: #{Time.now.to_i}"
end

When I run the following command in another tab:

ruby ./publisher.rb

Then my log file (./ruby-debug.log) contains the following lines:

message received: hello world
messages confirmed at: 1505774819
acknowledging message at: 1505774820
acknowledged message at: 1505774820

What I want is for the order of events to be like this:

message received
acknowledging message
acknowledged message
messages confirmed

How do I pull that off?

Upvotes: 0

Views: 1152

Answers (1)

Michael Klishin
Michael Klishin

Reputation: 786

Publisher confirms only cover publisher-to-RabbitMQ communication. Publishers are not aware of consumers.

See tutorial 6 for an example if the request/response pattern: https://www.rabbitmq.com/getstarted.html.

ConditionVariable is a commonly used concurrency primitive used for delaying further actions until an event happens, with an optional timeout.

Publisher confirms and consumer acknowledgements are documented at http://www.rabbitmq.com/confirms.htm.

Upvotes: 0

Related Questions