Fernando
Fernando

Reputation: 4629

Ruby + AMQP: processing queue in parallel

Since most of my tasks depends on the network, I want to process my queue in parallel, not just one message at a time.

So, I'm using the following code:

#!/usr/bin/env ruby
# encoding: utf-8

require "rubygems"
require 'amqp'

EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')

  channel  = AMQP::Channel.new(connection)
  channel.prefetch 5

  queue    = channel.queue("pending_checks", :durable => true)
  exchange = channel.direct('', :durable => true)

  queue.subscribe(:ack => true) do |metadata, payload|
    time = rand(3..9)
    puts 'waiting ' + time.to_s + ' for message ' + payload
    sleep(time)

    puts 'done with '+ payoad

    metadata.ack
  end
end

Why it is not using my prefetch setting? I guess it should get 5 messages and process them in parallel, no?

Upvotes: 4

Views: 787

Answers (1)

joelparkerhenderson
joelparkerhenderson

Reputation: 35483

Prefetch is the maximum number of messages that may be sent to you in advance before you ack.

In other words, the prefetch size does not limit the transfer of single messages to a client, only the sending in advance of more messages while the client still has one or more unacknowledged messages. (From AMPQ docs)

QoS Prefetching Messages

RabbitMQ AMQP Reference

EventMachine is single threaded and event based. For parallel jobs on different threads or processes, see EM::Deferrable, then Thread or spawn.

Also see Hot Bunnies, a fast DSL on top of the RabbitMQ Java client:

https://github.com/ruby-amqp/hot_bunnies

(Thanks for info from Michael Klishin on Google Groups, and stoyan on blogger)

Upvotes: 2

Related Questions