Reputation: 31
I'm trying to build RPC service at PHP using RabbitMQ similar to this example: http://www.rabbitmq.com/tutorials/tutorial-six-java.html I'm using this PECL extension: http://pecl.php.net/package/amqp (version 1.0.3)
The problem is that my Callback Queue (declared at Client script) is locked for a Server when I add flag AMQP_EXCLUSIVE to it.
Here is my Server
// connect to server
$cnn = new AMQPConnection('...');
$cnn->connect();
$channel = new AMQPChannel($cnn);
// create exchange
$exchangeName = 'k-exchange';
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
// declare queue to consume messages from
$queue = new \AMQPQueue($channel);
$queue->setName('tempQueue');
$queue->declare();
// start consuming messages
$queue->consume(function($envelope, $queue)
use ($channel, $exchange) {
// create callback queue
$callbackQueue = new \AMQPQueue($channel);
$callbackQueue->setName($envelope->getReplyTo());
$callbackQueue->setFlags(AMQP_EXCLUSIVE); // set EXCLUSIVE flag
/* WARNING: Following code line causes error. See rabbit logs below:
* connection <0.1224.10>, channel 1 - error:
* {amqp_error,resource_locked,
* "cannot obtain exclusive access to locked queue 'amq.gen-Q6J...' in vhost '/'",
* 'queue.bind'}
*/
$callbackQueue->bind($exchange->getName(), 'rpc_reply');
// trying to publish response back to client's callback queue
$exchange->publish(
json_encode(array('processed by remote service!')),
'rpc_reply',
AMQP_MANDATORY & AMQP_IMMEDIATE
);
$queue->ack($envelope->getDeliveryTag());
});
And here is my Client.php
// connect to server
$cnn = new AMQPConnection('...');
$cnn->connect();
$channel = new AMQPChannel($cnn);
// create exchange
$exchangeName = 'k-exchange';
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
// create a queue which we send messages to server via
$queue = new \AMQPQueue($channel);
$queue->setName('tempQueue');
$queue->declare();
// binding exchange to queue
$queue->bind($exchangeName, 'temp_action');
// create correlation_id
$correlationId = sha1(time() . rand(0, 1000000));
// create anonymous callback queue to get server response response via
$callbackQueue = new \AMQPQueue($channel);
$callbackQueue->setFlags(AMQP_EXCLUSIVE); // set EXCLUSIVE flag
$callbackQueue->declare();
// publishing message to exchange (passing it to server)
$exchange->publish(
json_encode(array('process me!')),
'temp_action',
AMQP_MANDATORY,
array(
'reply_to' => $callbackQueue->getName(), // pass callback queue name
'correlation_id' => $correlationId
)
);
// going to wait for remote service complete tasks. tick once a second
$attempts = 0;
while ($attempts < 5)
{
echo 'Attempt ' . $attempts . PHP_EOL;
$envelope = $callbackQueue->get();
if ($envelope) {
echo 'Got response! ';
print_r($envelope->getBody());
echo PHP_EOL;
exit;
}
sleep(1);
$attempts++;
}
So in the end I just see error in RabbitMQ's logs:
connection <0.1224.10>, channel 1 - error:
{amqp_error,resource_locked,
"cannot obtain exclusive access to locked queue 'amq.gen-Q6J...' in vhost '/'",
'queue.bind'}
Question: What is the proper way to create a callbackQueue object in a Server.php? It appears that my Server.php has a different from Client.php connection to a RabbitMQ server. What should I do here? How should I "share" the same (to Client.php's) connection at Server.php side.
UPDATE Here are some more RabbitMQ Logs
My Server.php connection (Id is: <0.22322.27>)
=INFO REPORT==== 20-Jun-2012::13:30:22 ===
accepting AMQP connection <0.22322.27> (127.0.0.1:58457 -> 127.0.0.1:5672)
My Client.php connection (Id is: <0.22465.27>)
=INFO REPORT==== 20-Jun-2012::13:30:38 ===
accepting AMQP connection <0.22465.27> (127.0.0.1:58458 -> 127.0.0.1:5672)
Now I see Server.php causes error:
=ERROR REPORT==== 20-Jun-2012::13:30:38 ===
connection <0.22322.27>, channel 1 - error:
{amqp_error,resource_locked,
"cannot obtain exclusive access to locked queue 'amq.gen-g6Q...' in vhost '/'",
'queue.bind'}
My Assumption I suspect since Client.php and Server.php do not share connection with the same Id it's impossible for them both to use exclusive queue declared in Client.php
Upvotes: 2
Views: 5613
Reputation: 328
There are a few issues with your implementation:
You don't need to declare an exchange (AMQPExchange) to publish messages. In this RPC example, you need to use it as a way of broadcasting a message (e.g. temporary queue or temporary exchange). All communication will occur directly on the QUEUE and theoretically bypasses the exchange.
$exchange = new AMQPExchange($channel);
$exchange->publish(...);
When you use AMQPQueue::setName() along with AMQPQueue::declare(), you are binding to a queue with a user defined name. If you declare the queue without a name, this is known as a temporary queue. This is useful when you need to receive a broadcasted message from a specific routing key. For this reason, RabbitMQ / AMQP generates a random temporary name. Since the queue name is made for a given instance to consume information exclusively, for its own sake, it is disposed of when the connection is closed.
When an RPC client wants to publish a message (AMQPExchange::publish()), it must specify a reply-to as one of the publish parameters. In this way, the RPC server can fetch the randomly generated name when it receives a request. It uses the reply-to name as the name of the QUEUE on which server will reply to the given client. Along with the temporary queue name, the instance must send a correlationId to ensure that the reply message it receives is unique to the request instance.
Client
$exchange = new AMQPExchange($channel);
$rpcServerQueueName = 'rpc_queue';
$client_queue = new AMQPQueue($this->channel);
$client_queue->setFlags(AMQP_EXCLUSIVE);
$client_queue->declareQueue();
$callbackQueueName = $client_queue->getName(); //e.g. amq.gen-JzTY20BRgKO-HjmUJj0wLg
//Set Publish Attributes
$corrId = uniqid();
$attributes = array(
'correlation_id' => $corrId,
'reply_to' => $this->callbackQueueName
);
$exchange->publish(
json_encode(['request message']),
$rpcServerQueueName,
AMQP_NOPARAM,
$attributes
);
//listen for response
$callback = function(AMQPEnvelope $message, AMQPQueue $q) {
if($message->getCorrelationId() == $this->corrId) {
$this->response = $message->getBody();
$q->nack($message->getDeliveryTag());
return false; //return false to signal to consume that you're done. other wise it continues to block
}
};
$client_queue->consume($callback);
Server
$exchange = new AMQPExchange($channel);
$rpcServerQueueName = 'rpc_queue';
$srvr_queue = new AMQPQueue($channel);
$srvr_queue->setName($rpcServerQueueName); //intentionally declares the rpc_server queue name
$srvr_queue->declareQueue();
...
$srvr_queue->consume(function(AMQPEnvelope $message, AMQPQueue $q) use (&$exchange) {
//publish with the exchange instance to the reply to queue
$exchange->publish(
json_encode(['response message']), //reponse message
$message->getReplyTo(), //get the reply to queue from the message
AMQP_NOPARAM, //disable all other params
$message->getCorrelationId() //obtain and respond with correlation id
);
//acknowledge receipt of the message
$q->ack($message->getDeliveryTag());
});
In this case, EXCLUSIVE is only used on the Rpc client's temporary queue for each instance so that it can publish a message. In other words, the client creates a disposable temporary queue for it self to receive an answer from the RPC server exclusively. This insures no other channel thread can post on that queue. It is locked for the client and its responder only. It's important to note that AQMP_EXCLUSIVE does not prevent the RPC server from responding on the client's reply-to queue. AMQP_EXCLUSIVE pertains to two separate threads (channels instances) trying to publish to the same queue resource. When this occurs, the queue is essentially locked for subsequent connections. The same behavior occurs with an exchange declaration.
@Denis: Your implementation in this case is correct up to a point
Bad - don't re-declare the Queue in the server. That's the client's job
$callbackQueue = new \AMQPQueue($channel);
$callbackQueue->setName($envelope->getReplyTo());
$callbackQueue->setFlags(AMQP_EXCLUSIVE); // set EXCLUSIVE flag
...
$callbackQueue->bind($exchange->getName(), 'rpc_reply');
You're trying to bind to a QUEUE called tempQueue. But you've already created a queue called tempQueue in the client.php. Depending on which service is started first, the other will throw an error. So you can cut out all of that and just keep the last part:
// trying to publish response back to client's callback queue
$exchange->publish(
json_encode(array('processed by remote service!')),
'rpc_reply', //<--BAD Should be: $envelope->getReplyTo()
AMQP_MANDATORY & AMQP_IMMEDIATE
);
Then modify the above by replacing:
'rpc_reply'
with
$envelope->getReplyTo()
Don't Declare a Queue Name on the client side
// create a queue which we send messages to server via
$queue = new \AMQPQueue($channel);
//$queue->setName('tempQueue'); //remove this line
//add exclusivity
$queue->setFlags(AMQP_EXCLUSIVE);
$queue->declare();
//no need for binding... we're communicating on the queue directly
//there is no one listening to 'temp_action' so this implementation will send your message into limbo
//$queue->bind($exchangeName, 'temp_action'); //remove this line
Upvotes: 5
Reputation: 2313
My answer from this question replied on the RabbitMQ Official mailing list
While not using the same library here you have the official tutorials ported to PHP
https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/php
The problem in your code is that you declare queues with different options.
So as one reply say, if you declare queue A as durable, then every other declaration of that queue must be durable. The same for the exclusive flag.
Also you don't need to redeclare a queue to publish messages to it. As an RPC server you assume that the address sent in the 'reply_to' property is already present. I think is the responsibility of the RpcClient to make sure the queue where it is waiting for replies exists already.
Addendum:
Exclusivity in queues means that the only the channel that declared the queue can access it.
Upvotes: 1
Reputation: 2708
On your server you should also declare your queue as exclusive. Remember, RabbitMQ queues should have the same flag. For example if you declare queue that is set to "durable" the other end should also declare the queue a "durable" So on your server put a flag $callbackQueue->setFlags(AMQP_EXCLUSIVE);
somewhat like that from your client.
Upvotes: 0