dangerous.beans
dangerous.beans

Reputation: 598

High priority queue over a lower one in ruby AMQP with RabbitMQ?

Given that I have a worker subscribed to two queues 'Low' and 'High', I would like the worker to only work on messages from the low priority queue if the high priority queue is empty.

I'm attempting to do this by defining two channels and setting the prefetch to a higher value on the higher priority queue, as suggested here: http://dougbarth.github.io/2011/07/01/approximating-priority-with-rabbitmq.html

This is my worker code:

require "rubygems"
require "amqp"

EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  channel_low  = AMQP::Channel.new(connection)
  channel_high  = AMQP::Channel.new(connection)

  # Attempting to set the prefetch higher on the high priority queue
  channel_low.prefetch(10)
  channel_high.prefetch(20)

  low_queue    = channel_low.queue("low", :auto_delete => false)
  high_queue    = channel_high.queue("high", :auto_delete => false)

  low_queue.subscribe do |payload|
    puts "#{payload}"
    slow_task
  end

  high_queue.subscribe do |payload|
    puts "#{payload}"
    slow_task
  end

  def slow_task
    # Do some slow work
    sleep(1)
  end
end

When I run this client against it I do not see the high priority messages processed first:

require "rubygems"
require "amqp"

EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  channel  = AMQP::Channel.new(connection)

  low_queue    = channel.queue("low")
  high_queue    = channel.queue("high")
  exchange = channel.direct("")

  10.times do |i| 
    message = "LOW #{i}"
    puts "sending: #{message}"
    exchange.publish message, :routing_key => low_queue.name
  end

  # EventMachine.add_periodic_timer(0.0001) do
  10.times do |i|
    message = "HIGH #{i}"
    puts "sending: #{message}"
    exchange.publish message, :routing_key => high_queue.name
  end

end

Output:

Client >>>
sending: LOW 0
sending: LOW 1
sending: LOW 2
sending: LOW 3
sending: LOW 4
sending: LOW 5
sending: LOW 6
sending: LOW 7
sending: LOW 8
sending: LOW 9
sending: HIGH 0
sending: HIGH 1
sending: HIGH 2
sending: HIGH 3
sending: HIGH 4
sending: HIGH 5
sending: HIGH 6
sending: HIGH 7
sending: HIGH 8
sending: HIGH 9

Server >>>

HIGH 0
HIGH 1
LOW 0
LOW 1
LOW 2
HIGH 2
LOW 3
LOW 4
LOW 5
LOW 6
LOW 7
HIGH 3
LOW 8
LOW 9
HIGH 4
HIGH 5
HIGH 6
HIGH 7
HIGH 8
HIGH 9

Upvotes: 2

Views: 1429

Answers (4)

Doug Barth
Doug Barth

Reputation: 336

Like Michael said, there's a few issues with your current approach:

  • Not enabling explicit acks means RabbitMQ considers messages delivered when it has sent them, not when you've processed them
  • Your messages are so small that they're able to be delivered quickly over the network
  • Your subscribe block is called when EventMachine reads the network data, once for each full frame of data it read from the socket
  • Finally, blocking the reactor thread (with sleep) will prevent EM from sending the acks out to the socket so the proper behavior isn't achieved.

In order to implement the priority concept, you need to separate receiving the network data from acknowledging it. In our application (which I wrote the blog post about), we used a background thread and a priority queue to reorder the work coming in. This introduces a small buffer of messages in each worker. Some of those messages may be low priority messages that won't be worked until there are no higher priority messages to work on.

Here's a slightly modified worker code that uses a worker thread and a priority queue to get the desired results.

require "rubygems"
require "amqp"
require "pqueue"

EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  channel_low  = AMQP::Channel.new(connection)
  channel_high  = AMQP::Channel.new(connection)

  # Attempting to set the prefetch higher on the high priority queue
  channel_low.prefetch(10)
  channel_high.prefetch(20)

  low_queue    = channel_low.queue("low", :auto_delete => false)
  high_queue    = channel_high.queue("high", :auto_delete => false)

  # Our priority queue for buffering messages in the worker's memory
  to_process = PQueue.new {|a,b| a[0] > b[0] }

  # The pqueue gem isn't thread safe
  mutex = Mutex.new

  # Background thread for working blocking operation. We can spin up more of
  # these to increase concurrency.
  Thread.new do
    loop do
      _, header, payload = mutex.synchronize { to_process.pop }

      if payload
        puts "#{payload}"
        slow_task
        # We need to call ack on the EM thread.
        EM.next_tick { header.ack }
      else
        sleep(0.1)
      end
    end
  end

  low_queue.subscribe(:ack => true) do |header, payload|
    mutex.synchronize { to_process << [0, header, payload] }
  end

  high_queue.subscribe(:ack => true) do |header, payload|
    mutex.synchronize { to_process << [10, header, payload] }
  end

  def slow_task
    # Do some slow work
    sleep(1)
  end
end

If you need to increase concurrency, you can spawn more than one background thread.

Upvotes: 2

Michael Klishin
Michael Klishin

Reputation: 786

Your approach is one of the most commonly used workarounds. However, there is a couple of issues in the specific example posted.

Prefetch does not control which channel has priority for deliveries. It controls how many messages can be "in progress" (unacknowledged) on it.

This can be used as poor man's prioritization technique, however, you use automatic acknowledgement mode, so prefetch does not come into play (RabbitMQ immediately considers your messages acknowledged as soon as it sends them out).

If you only publish a small number of messages and then your example finishes running, it is much more likely that the ordering will depend greatly on the order you publish messages in.

To see the effect of the prefetch setting with manual acknowledgements, you'd need to run it for a longer period of time (which depends on your message rates, but say, at least a minute.

Upvotes: 1

mcfinnigan
mcfinnigan

Reputation: 11638

You're publishing small messages to a processor which is likely processing them within a couple of milliseconds of their arriving. I suspect what you're seeing here is sample noise, and not what you actually think you're seeing.

Can you verify the order in which the messages are being published is as you expect?

Upvotes: 0

Bala
Bala

Reputation: 11274

Can you try something like this. I am not sure if this produces what you need, but worth trying.

10.times do 
   10.times do 
      exchange.publish "HIGH", :routing_key => high_queue.name
    end  
   exchange.publish "LOW", :routing_key => low_queue.name
end

Upvotes: 0

Related Questions