vchslv
vchslv

Reputation: 11

Troubleshooting topic exchanges in php-amqplib

I'm a beginner with RabbitMQ and having some difficulties setting up topic exchanges. I'm working with two scripts: emitter.php and receiver.php. The receiver.php script will be replicated using docker-compose. The emitter.php script generates and sends messages to randomly chosen routings, while the receivers have random routing keys.

emitter.php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$EXCHANGE = "e005";

$connection = new AMQPStreamConnection(
    'rabbitmq',
    5672,
    'rabbit',
    'password'
);
$channel = $connection->channel();
$channel->exchange_declare($EXCHANGE, 'topic', false, false, false);

try {
    for ($i=0; ;$i++) {
        $mode = ["odd", "even", "*"][rand(0, 2)];
        $level = ["info", "debug", "warning", "error", "critical", "*"][rand(0, 5)];
        $broadcast = rand(0, 9) === 7; // 10% chance of a broadcast
        $routing_key = $broadcast ? "#" : "$mode.$level";
        $msg = new AMQPMessage($i);
        $channel->basic_publish($msg, $EXCHANGE, $routing_key);
        echo " - [x] Sent $i by $routing_key\n";
        sleep(1);
    }
} finally {
    $channel->close();
    $connection->close();
}

receiver.php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$MODE = ["odd", "even", "*"][rand(0, 2)];
$LEVEL = ["info", "debug", "warning", "error", "critical", "*"][rand(0, 5)];
$ROUTING_KEY="$MODE.$LEVEL";
$EXCHANGE = "e005";

$connection = new AMQPStreamConnection(
    'rabbitmq',
    5672,
    'rabbit',
    'password'
);
$channel = $connection->channel();
$channel->exchange_declare($EXCHANGE, 'topic', false, false, false);
list($queue, ,) = $channel->queue_declare("", false, false, true, false);

$channel->queue_bind($queue, $EXCHANGE, $ROUTING_KEY);

echo " - [$ROUTING_KEY] Waiting for logs. To exit press CTRL+C\n";

$callback = function (AMQPMessage $msg) {
    global $ROUTING_KEY;
    $key = $msg->getRoutingKey();
    echo " - [$ROUTING_KEY] got $msg->body by $key";
};

$channel->basic_consume($queue, '', false, true, false, false, $callback);

try {
    $channel->consume();
} catch (\Throwable $exception) {
    echo $exception->getMessage();
}

$channel->close();
$connection->close();

However, there's no receiver who consumes messages. Where did I go wrong? I tried setting the routing keys manually instead of randomly generating them, but without success.

Upvotes: 0

Views: 34

Answers (0)

Related Questions