Reputation: 10676
I have a consumer which listens for messages, if the flow of messages is more than the consumer can handle I want to start another instance of this consumer.
But I also want to be able to poll for information from the consumer(s), my thought was that I could use RPC to request this information from the producers by using a fanout exchange so all the producers gets the RPC-call.
My question is first of all is this possible and secondly is it reasonable?
Upvotes: 3
Views: 3028
Reputation: 351
If the question is "is it possible to send an RPC message to more than one server?" the answer is yes.
When you build an RPC call you attach a temporary queue to the message (usually in header.reply_to but you can also use internal message fields). This is the queue where RPC targets will publish their answers.
When you send an RPC to a single server you can receive more than one message on the temporary queue: this means that an RPC answer could be formed by:
The problems arising in this scenario are
Just some code to show how you can do it (Python with Pika library). Pay attention, this is far from perfection: the biggest problem is that you should reset the timeout when you get a new answer.
def consume_rpc(self, queue, result_len=1, callback=None, timeout=None, raise_timeout=False):
if timeout is None:
timeout = self.rpc_timeout
result_list = []
def _callback(channel, method, header, body):
print "### Got 1/%s RPC result" %(result_len)
msg = self.encoder.decode(body)
result_dict = {}
result_dict.update(msg['content']['data'])
result_list.append(result_dict)
if callback is not None:
callback(msg)
if len(result_list) == result_len:
print "### All results are here: stopping RPC"
channel.stop_consuming()
def _outoftime():
self.channel.stop_consuming()
raise TimeoutError
if timeout != -1:
print "### Setting timeout %s seconds" %(timeout)
self.conn_broker.add_timeout(timeout, _outoftime)
self.channel.basic_consume(_callback, queue=queue, consumer_tag=queue)
if raise_timeout is True:
print "### Start consuming RPC with raise_timeout"
self.channel.start_consuming()
else:
try:
print "### Start consuming RPC without raise_timeout"
self.channel.start_consuming()
except TimeoutError:
pass
return result_list
Upvotes: 4
Reputation: 10676
After some researching it seems that this is not possible. If you look at the tutorial on RabbitMQ.com you see that there is an id for the call which, as far as I understand gets consumed.
I've choosen to go another way, which is reading the log-files and aggregating the data.
Upvotes: 1