Reputation:
I am new to rabbitmq and trying to figure out how I can make a client request a server with information about memory and CPU utilization with this tutorial (https://www.rabbitmq.com/tutorials/tutorial-six-python.html).
So the client requests for CPU and memory ( I believe I will need two queues) and the server respond with the values.
Is there anyway to simple create a client.py
and server.py
with this case using the Pika library in Python.
Upvotes: 0
Views: 4176
Reputation: 5340
I would recommend you to follow the first RabbitMQ tutorials if you haven't already. The RPC example builds on concepts covered on previous examples (direct queues, exclusive queues, acknowledgements, etc.).
The RPC solution proposed on the tutorial requires at least two queues, depending on how many clients you want to use:
rpc_queue
), used to send requests from the client to the server.The request/response cycle:
rpc_queue
. Each message includes a reply_to
property, with the name of the client exclusive queue the server should reply to, and a correlation_id
property, which is just an unique id used to track the request.rpc_queue
. When a message arrives, it prepares the response, adds the correlation_id
to the new message, and sends it to the queue defined in the reply_to
message property.correlation_id
that was originally generated.Jumping straight to your problem, the first thing to do is to define the message format you'll want to use on your responses. You can use JSON, msgpack or any other serialization library. For example, if using JSON, one message could look something like this:
{
"cpu": 1.2,
"memory": 0.3
}
Then, on your server.py
:
def on_request(channel, method, props, body):
response = {'cpu': current_cpu_usage(),
'memory': current_memory_usage()}
properties = pika.BasicProperties(correlation_id=props.correlation_id)
channel.basic_publish(exchange='',
routing_key=props.reply_to,
properties=properties,
body=json.dumps(response))
channel.basic_ack(delivery_tag=method.delivery_tag)
# ...
And on your client.py
:
class ResponseTimeout(Exception): pass
class Client:
# similar constructor as `FibonacciRpcClient` from tutorial...
def on_response(self, channel, method, props, body):
if self.correlation_id == props.correlation_id:
self.response = json.loads(body.decode())
def call(self, timeout=2):
self.response = None
self.correlation_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.correlation_id),
body='')
start_time = time.time()
while self.response is None:
if (start_time + timeout) < time.time():
raise ResponseTimeout()
self.connection.process_data_events()
return self.response
As you see, the code is pretty much the same as the original FibonacciRpcClient
. The main differences are:
call()
method doesn't require a body
argument (there's nothing to send to the server)Still, there're a lot of things to improve here:
reply_to
queue, our server is gonna crash, and will crash again on restart (the broken message will be requeued infinitely as long as it isn't acknowledged by our server)You may also consider replacing the RPC approach with a publish/subscribe pattern; in this way, the server simply broadcasts its CPU/memory state every X time interval, and one or more clients receive the updates.
Upvotes: 1