Reputation: 13321
I'm using Symfony 5 & Messenger to consume JSON messages from an external app via a Rabbitmq topic queue.
The consumer errors out on the serialization step. It is not making it to my handler yet, so I'll focus here on the serializer.
This is the error when consuming the first message it finds:
Encoded envelope does not have a "type" header.
I tried adding a type
property to the message header with the value of application/json
, and then json
, but I get similar errors:
Could not denormalize object of type "json", no supporting normalizer found.
Ideally I wouldn't need to add headers on the external system, but I can if necessary.
How can I configure Symfony or the message itself to get past the json serializer error?
Here is my setup:
Message example: ["ABC", 1]
messenger.yaml
:
framework:
messenger:
transports:
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
serializer: messenger.transport.symfony_serializer
retry_strategy:
delay: 500
options:
exchange:
name: tln_pot
type: topic
flags: 0
default_publish_routing_key: tln_pot
queues:
tln_pot:
binding_keys: [tln_pot]
bin/console config:dump framework messenger
...
serializer:
# Service id to use as the default serializer for the transports.
default_serializer: messenger.transport.native_php_serializer
symfony_serializer:
# Serialization format for the messenger.transport.symfony_serializer service (which is not the serializer used by default).
format: json
# Context array for the messenger.transport.symfony_serializer service (which is not the serializer used by default).
context:
# Prototype
name: ~
Upvotes: 0
Views: 5607
Reputation: 410
We recently did the same thing and for others to get the better view of this I am adding few things as the answer.
First of all the transport specified in your config is your sender if you send the message to the queue and the same is your receiver when you consume messages.
Actually you are receiving data from the external source and you can define your own serializer which will acts as your receiver and in the serializer you can decode or encode your messages and mold them according to messenger message format.
Our config for the messenger looks like this:
framework:
messenger:
# Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
# failure_transport: failed
transports:
# https://symfony.com/doc/current/messenger.html#transport-configuration
async: '%env(MESSENGER_TRANSPORT_DSN)%'
failed: 'doctrine://default?queue_name=failed'
# sync: 'sync://'
external_messages:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
serializer: App\Messenger\ExternalJsonMessageSerializer
routing:
# Route your messages to the transports
'App\Message\YourAqmpMessageClass': external_messages
Here in the ExternalJsonMessageSerializer class you can receive your decoded message and return the envelope of the message as per symfony message standard,
....
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
$body = $encodedEnvelope['body'] ;
$headers = $encodedEnvelope['headers'];
$data = unserialize($body);
if (null === $data) {
throw new MessageDecodingFailedException('Invalid JSON');
}
// in case of redelivery, unserialize any stamps
$stamps = [];
if (isset($headers['stamps'])) {
$stamps = unserialize($headers['stamps']);
}
$envelope = new Envelope(new YourAqmpMessageClass(serialize($data)));
$envelope = $envelope->with(... $stamps);
return $envelope;
}
....
here is the quick view https://symfonycasts.com/screencast/messenger/transport-serializer
and the data we are sending to the queue from our sender app is
$sendDataToQueue = json_encode([
'headers'=>['type'=>'mail'],
'body' => serialize(['data'=>$data])
]);
then you can put that data to queue.
if you are consuming the messages through messenger then you need to follow Envelope/Message format.
When your external serializer will receive the message , the message will be json decoded already. All you need then is to properly check the message.
And your consume command will look like this:
bin/console messenger:consume external_messages -vv
Hope it clarifies things for you.
Further reading:
https://www.slideshare.net/CodeId/mastering-message-queues-tobias-nyholm-codeid
https://tool.lu/en_US/deck/jh/detail
Upvotes: 2