nnikolay
nnikolay

Reputation: 1751

Symfony Messenger with CloudEvents and multiple queues

I am implementing CloudEvents in my application using the Symfony Messenger component.

I have two queues, both of which should handle CloudEvents.

Here is my current configuration:

routing:
    'CloudEvents\V1\CloudEvent':
      - incoming
      - outgoing

And when I want to add a CloudEvent to the queue, I want to specify exactly where in the queue the event should be added.

$this->messageBus->dispatch(
    new Envelope(
        $cloudEvent,
        [
            new TransportMessageIdStamp('outgoing')
        ]
    )
);

or

$this->messageBus->dispatch(
        $cloudEvent,
        [
            new TransportMessageIdStamp('outgoing')
        ]
);

I tried both, but the event still goes into the "outgoing" channel. How can I prevent this?

Upvotes: -1

Views: 59

Answers (1)

nnikolay
nnikolay

Reputation: 1751

I solved the problem with a middleware, but there's one part of the solution I'm not happy with. If more stamps are added, the middleware will break the process. Perhaps someone has a more elegant way of handling this.

Here is my middleware class:

<?php

declare(strict_types=1);

namespace App\Messenger\Middleware;

use CloudEvents\V1\CloudEvent;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;

readonly class CloudEventRoutingMiddleware implements MiddlewareInterface
{
    public function __construct(
        private SendersLocatorInterface $sendersLocator
    ) {
    }

    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        $message = $envelope->getMessage();
        $stamps = $envelope->all();

        // We want to prevent the middleware from being used during event consumption.
        // This approach isn't ideal; we should find a more elegant solution.
        if (count($stamps) > 1) {
            return $stack->next()->handle($envelope, $stack);
        }

        if ($message instanceof CloudEvent) {
            $transportName = $this->determineTransportName($message);
            $senders = $this->sendersLocator->getSenders($envelope);

            foreach ($senders as $name => $sender) {
                if ($name === $transportName) {
                    /** @var SenderInterface $sender */
                    $sender->send($envelope);

                    return $envelope;
                }
            }
        }

        return $stack->next()->handle($envelope, $stack);
    }

    private function determineTransportName(CloudEvent $event): string
    {
        return match ($event->getType()) {
            'task.processing' => 'incoming',
            'task.completion' => 'outgoing',
            default => throw new \RuntimeException(
                sprintf('No transport found for event type: %s', $event->getType())
            ),
        };
    }
}

Upvotes: 0

Related Questions