Reputation: 11
I'm a beginner with RabbitMQ and having some difficulties setting up topic exchanges. I'm working with two scripts: emitter.php
and receiver.php
. The receiver.php
script will be replicated using docker-compose. The emitter.php
script generates and sends messages to randomly chosen routings, while the receivers have random routing keys.
emitter.php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$EXCHANGE = "e005";
$connection = new AMQPStreamConnection(
'rabbitmq',
5672,
'rabbit',
'password'
);
$channel = $connection->channel();
$channel->exchange_declare($EXCHANGE, 'topic', false, false, false);
try {
for ($i=0; ;$i++) {
$mode = ["odd", "even", "*"][rand(0, 2)];
$level = ["info", "debug", "warning", "error", "critical", "*"][rand(0, 5)];
$broadcast = rand(0, 9) === 7; // 10% chance of a broadcast
$routing_key = $broadcast ? "#" : "$mode.$level";
$msg = new AMQPMessage($i);
$channel->basic_publish($msg, $EXCHANGE, $routing_key);
echo " - [x] Sent $i by $routing_key\n";
sleep(1);
}
} finally {
$channel->close();
$connection->close();
}
receiver.php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$MODE = ["odd", "even", "*"][rand(0, 2)];
$LEVEL = ["info", "debug", "warning", "error", "critical", "*"][rand(0, 5)];
$ROUTING_KEY="$MODE.$LEVEL";
$EXCHANGE = "e005";
$connection = new AMQPStreamConnection(
'rabbitmq',
5672,
'rabbit',
'password'
);
$channel = $connection->channel();
$channel->exchange_declare($EXCHANGE, 'topic', false, false, false);
list($queue, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue, $EXCHANGE, $ROUTING_KEY);
echo " - [$ROUTING_KEY] Waiting for logs. To exit press CTRL+C\n";
$callback = function (AMQPMessage $msg) {
global $ROUTING_KEY;
$key = $msg->getRoutingKey();
echo " - [$ROUTING_KEY] got $msg->body by $key";
};
$channel->basic_consume($queue, '', false, true, false, false, $callback);
try {
$channel->consume();
} catch (\Throwable $exception) {
echo $exception->getMessage();
}
$channel->close();
$connection->close();
However, there's no receiver who consumes messages. Where did I go wrong? I tried setting the routing keys manually instead of randomly generating them, but without success.
Upvotes: 0
Views: 34