Reputation: 2137
I cannot figure out how to subscribe to my RabbitMQ locally(it worked when I used CloudAMPQ.) I suspect the problem is that my SendPriceJob isn't connected to the right connection/channel/exchange but I'm not certain.
class FetchPriceJob
@queue = :update_price
def self.perform
# Do some stuff
FetchPriceJob.new.publish(response.to_json)
end
def publish(data)
channel.default_exchange.publish(data, routing_key: queue.name)
connection.close
end
def connection
@conn ||= begin
conn = Bunny.new(host: "localhost", vhost: "/", user: "guest", password: "guest")
conn.start
end
end
def channel
@channel ||= connection.create_channel
end
def queue
@queue ||= channel.queue('current_prices')
end
end
module SendPriceJob
@queue = :price_serve
def self.perform
conn = Bunny.new(host: "localhost", vhost: "/", user: "guest", password: "guest")
conn.start
ch = conn.create_channel
x = ch.default_exchange
q = ch.queue('current_prices')
begin
q.subscribe(block: true) do |_, _, body|
ActionCable.server.broadcast 'prices', body
end
rescue Interrupt => _
ch.close
conn.close
end
end
end
# Procfile
elastic: elasticsearch
redis: redis-server
web: rails server -p 3000
send_worker: QUEUE=price_serve rake resque:work
fetch_worker: QUEUE=update_price rake resque:work
scheduler: rake resque:scheduler
I'm running my RabbitMQ server: http://prntscr.com/i6rcwv. I'm successfully queueing messages on the connection/channel/exchange: http://prntscr.com/i6rd7q. From my logs it seems that the scheduler I'm running works, as does the producer: http://prntscr.com/i6rdxf.
This is my first time working with message queues
so I may have done something completely wrong. I feel like I should be close though because it was working with CloudAMQP. The only difference was that the Bunny.new
was configured to connect to an external API.
Upvotes: 0
Views: 2096
Reputation: 1776
You can use the sneakers or pwwka gem for background processing of RabbitMQ messages. Saves you a lot of the bottle neck involved in subscribing to the message queues.
Upvotes: 2
Reputation: 156
In this method:
def publish(data)
channel.default_exchange.publish(data, routing_key: queue.name)
connection.close
end
Can you try this instead:
queue.publish(data, routing_key: queue.name)
Documentation: http://rubybunny.info/articles/exchanges.html#default_exchange
Upvotes: 1