Reputation: 598
Given that I have a worker subscribed to two queues 'Low' and 'High', I would like the worker to only work on messages from the low priority queue if the high priority queue is empty.
I'm attempting to do this by defining two channels and setting the prefetch to a higher value on the higher priority queue, as suggested here: http://dougbarth.github.io/2011/07/01/approximating-priority-with-rabbitmq.html
This is my worker code:
require "rubygems"
require "amqp"
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel_low = AMQP::Channel.new(connection)
channel_high = AMQP::Channel.new(connection)
# Attempting to set the prefetch higher on the high priority queue
channel_low.prefetch(10)
channel_high.prefetch(20)
low_queue = channel_low.queue("low", :auto_delete => false)
high_queue = channel_high.queue("high", :auto_delete => false)
low_queue.subscribe do |payload|
puts "#{payload}"
slow_task
end
high_queue.subscribe do |payload|
puts "#{payload}"
slow_task
end
def slow_task
# Do some slow work
sleep(1)
end
end
When I run this client against it I do not see the high priority messages processed first:
require "rubygems"
require "amqp"
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel = AMQP::Channel.new(connection)
low_queue = channel.queue("low")
high_queue = channel.queue("high")
exchange = channel.direct("")
10.times do |i|
message = "LOW #{i}"
puts "sending: #{message}"
exchange.publish message, :routing_key => low_queue.name
end
# EventMachine.add_periodic_timer(0.0001) do
10.times do |i|
message = "HIGH #{i}"
puts "sending: #{message}"
exchange.publish message, :routing_key => high_queue.name
end
end
Output:
Client >>>
sending: LOW 0
sending: LOW 1
sending: LOW 2
sending: LOW 3
sending: LOW 4
sending: LOW 5
sending: LOW 6
sending: LOW 7
sending: LOW 8
sending: LOW 9
sending: HIGH 0
sending: HIGH 1
sending: HIGH 2
sending: HIGH 3
sending: HIGH 4
sending: HIGH 5
sending: HIGH 6
sending: HIGH 7
sending: HIGH 8
sending: HIGH 9
Server >>>
HIGH 0
HIGH 1
LOW 0
LOW 1
LOW 2
HIGH 2
LOW 3
LOW 4
LOW 5
LOW 6
LOW 7
HIGH 3
LOW 8
LOW 9
HIGH 4
HIGH 5
HIGH 6
HIGH 7
HIGH 8
HIGH 9
Upvotes: 2
Views: 1429
Reputation: 336
Like Michael said, there's a few issues with your current approach:
In order to implement the priority concept, you need to separate receiving the network data from acknowledging it. In our application (which I wrote the blog post about), we used a background thread and a priority queue to reorder the work coming in. This introduces a small buffer of messages in each worker. Some of those messages may be low priority messages that won't be worked until there are no higher priority messages to work on.
Here's a slightly modified worker code that uses a worker thread and a priority queue to get the desired results.
require "rubygems"
require "amqp"
require "pqueue"
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel_low = AMQP::Channel.new(connection)
channel_high = AMQP::Channel.new(connection)
# Attempting to set the prefetch higher on the high priority queue
channel_low.prefetch(10)
channel_high.prefetch(20)
low_queue = channel_low.queue("low", :auto_delete => false)
high_queue = channel_high.queue("high", :auto_delete => false)
# Our priority queue for buffering messages in the worker's memory
to_process = PQueue.new {|a,b| a[0] > b[0] }
# The pqueue gem isn't thread safe
mutex = Mutex.new
# Background thread for working blocking operation. We can spin up more of
# these to increase concurrency.
Thread.new do
loop do
_, header, payload = mutex.synchronize { to_process.pop }
if payload
puts "#{payload}"
slow_task
# We need to call ack on the EM thread.
EM.next_tick { header.ack }
else
sleep(0.1)
end
end
end
low_queue.subscribe(:ack => true) do |header, payload|
mutex.synchronize { to_process << [0, header, payload] }
end
high_queue.subscribe(:ack => true) do |header, payload|
mutex.synchronize { to_process << [10, header, payload] }
end
def slow_task
# Do some slow work
sleep(1)
end
end
If you need to increase concurrency, you can spawn more than one background thread.
Upvotes: 2
Reputation: 786
Your approach is one of the most commonly used workarounds. However, there is a couple of issues in the specific example posted.
Prefetch does not control which channel has priority for deliveries. It controls how many messages can be "in progress" (unacknowledged) on it.
This can be used as poor man's prioritization technique, however, you use automatic acknowledgement mode, so prefetch does not come into play (RabbitMQ immediately considers your messages acknowledged as soon as it sends them out).
If you only publish a small number of messages and then your example finishes running, it is much more likely that the ordering will depend greatly on the order you publish messages in.
To see the effect of the prefetch setting with manual acknowledgements, you'd need to run it for a longer period of time (which depends on your message rates, but say, at least a minute.
Upvotes: 1
Reputation: 11638
You're publishing small messages to a processor which is likely processing them within a couple of milliseconds of their arriving. I suspect what you're seeing here is sample noise, and not what you actually think you're seeing.
Can you verify the order in which the messages are being published is as you expect?
Upvotes: 0
Reputation: 11274
Can you try something like this. I am not sure if this produces what you need, but worth trying.
10.times do
10.times do
exchange.publish "HIGH", :routing_key => high_queue.name
end
exchange.publish "LOW", :routing_key => low_queue.name
end
Upvotes: 0