Reputation: 13486
I have two services, Manager and Collector.
COLLECTED_USER
with routingKey user.collected
and invokes a UserCollected
handler.COLLECT_USER
with routingKey user.collect
and invokes a CollectUser
handler.There can be multiple collectors so I have set exclusive
to false
(see below for code).
There are also other services that listen for events like
user.created
,user.updated
,user.deleted
In addition there are services that listen for more general events like
#.created
user.#
and so on.
So I am using a topic
exchange.
| exchange | type | routingKey | queueName |
| -------- | ----- | -------------- | ------------- |
| MY_APP | topic | user.collect | COLLECT_USER |
| MY_APP | topic | user.collected | COLLECTED_USER |
user.collect
user.collect
message and invokes a CollectUser
handlerCollectUser
handler does work, then publishes a message with routingKey user.collected
user.collected
message and invokes the UserCollected
handleruser.collect
(correct)user.collect
message and invokes a CollectUser
handler (correct)user.collect
message and invokes the UserCollected
handler with the wrong data. (wrong)CollectUser
handler does work, then publishes a message with routingKey user.collected
(correct)user.collected
message and invokes the UserCollected
handler (correct)Why does the Manager get the user.collect
message, given:
COLLECTED_USER
queue not the COLLECT_USER
queue, andCOLLECT_USER
queue, has already handled the message.I create the subscribers and publishers as follows (trimmed for relevance)
given the AMQP url
and params url
, exchange
, type
, routingKey
, queueName
and handler
const connection = await amqp.connect(url)
const channel = await connection.createChannel()
channel.assertExchange(exchange, type, { durable: true })
const result = await channel.assertQueue(queueName, { exclusive: false })
channel.bindQueue(result.queue, exchange, routingKey)
channel.prefetch(1)
channel.consume(result.queue, handler)
given the AMQP url
and params url
, exchange
, and type
const connection = await amqp.connect(url)
const channel = await connection.createChannel()
await channel.assertExchange(exchange, type, { durable: true })
given the channel
and params exchange
, routingKey
, and message
await channel.publish(exchange, routingKey, message)
This question is a follow-on from RabbitMQ — Why are my Routing Keys being ignored when using topic exchange .
Upvotes: 3
Views: 1176
Reputation: 13486
I finally worked out what my problem was. A dirty exchange. While experimenting with this I'd inadvertently added an exchange that was routing messages to the wrong queue, and this was causing my confusion.
To fix it I fired up the RabbitMQ admin GUI and deleted all of the queues and let my code create the ones it needed. There was no issue with the code as outlined above.
Upvotes: 2