Reputation: 9476
Using Ruby's "bunny" RabbitMQ client, I want my producer (some Ruby code) to send a message to a consumer (a worker using the "sneakers" gem), and I want my producer to not execute another line of its Ruby code until the producer receives confirmation that the consumer received my message and did some work with it.
In my consumer, I am doing some work and then calling sneakers' ack!
method to acknowledge that the message was received and the work was done.
In my producer, I am calling confirm_select
on my Bunny::Channel
instance to put it into confirmation code, and after publish
-ing my messages, I call wait_for_confirms
on the channel to supposedly wait until all my messages have been ack!
-ed by the consumer. (I've tried implementing what I found in the bunny docs here.)
However, it seems that my producer is not waiting for the consumer to call ack!
. I'm logging in both my producer and my consumer and finding that my producer seems to think messages have been acknowledged before the consumer actually acknowledges them.
How do I make a RabbitMQ producer wait until the consumer has finished its work in Ruby?
Ruby 2.3.3, RabbitMQ 3.6.12, Erlang 17.3.
Here is my lockfile:
GEM
specs:
amq-protocol (2.2.0)
bunny (2.7.0)
amq-protocol (>= 2.2.0)
concurrent-ruby (1.0.5)
serverengine (1.5.11)
sigdump (~> 0.2.2)
sigdump (0.2.4)
sneakers (2.6.0)
bunny (~> 2.7.0)
concurrent-ruby (~> 1.0)
serverengine (~> 1.5.11)
thor
thor (0.20.0)
PLATFORMS
ruby
DEPENDENCIES
bunny
sneakers
BUNDLED WITH
1.14.6
Here is my consumer / worker (consumer_worker.rb
):
class ConsumerWorker
include Sneakers::Worker
from_queue 'do-work-here',
exchange: 'do-work-here',
exchange_type: :direct,
durable: true,
prefetch: 1,
arguments: {
:'x-dead-letter-exchange' => 'do-work-here-retry'
},
timeout_job_after: 5,
retry_timeout: 60000,
ack: true
def work(msg)
open('ruby-debug.log', 'a') do |f|
f.puts "message received: #{msg}"
end
sleep 1
open('ruby-debug.log', 'a') do |f|
f.puts "acknowledging message at: #{Time.now.to_i}"
end
ack!
open('ruby-debug.log', 'a') do |f|
f.puts "acknowledged message at: #{Time.now.to_i}"
end
end
end
In one terminal tab, I am running this worker with:
bundle exec sneakers work ConsumerWorker --require consumer_worker.rb
Here is my publisher (publisher.rb
):
require 'bunny'
connection = Bunny.new('amqp://guest:guest@localhost:5672').tap(&:start)
channel = connection.create_channel
channel.confirm_select
queue = channel.queue('do-work-here',
{arguments: {:'x-dead-letter-exchange' => 'do-work-here-retry'},
durable: true})
queue.publish('hello world', persistent: true)
channel.wait_for_confirms
open('ruby-debug.log', 'a') do |f|
f.puts "messages confirmed at: #{Time.now.to_i}"
end
When I run the following command in another tab:
ruby ./publisher.rb
Then my log file (./ruby-debug.log
) contains the following lines:
message received: hello world
messages confirmed at: 1505774819
acknowledging message at: 1505774820
acknowledged message at: 1505774820
What I want is for the order of events to be like this:
message received
acknowledging message
acknowledged message
messages confirmed
How do I pull that off?
Upvotes: 0
Views: 1152
Reputation: 786
Publisher confirms only cover publisher-to-RabbitMQ communication. Publishers are not aware of consumers.
See tutorial 6 for an example if the request/response pattern: https://www.rabbitmq.com/getstarted.html.
ConditionVariable is a commonly used concurrency primitive used for delaying further actions until an event happens, with an optional timeout.
Publisher confirms and consumer acknowledgements are documented at http://www.rabbitmq.com/confirms.htm.
Upvotes: 0