Dave Sag
Dave Sag

Reputation: 13486

RabbitMQ — Why does the wrong subscriber get a published message?

I have two services, Manager and Collector.

  1. Manager is subscribed to Queue COLLECTED_USER with routingKey user.collected and invokes a UserCollected handler.
  2. Collector is subscribed to Queue COLLECT_USER with routingKey user.collect and invokes a CollectUser handler.

There can be multiple collectors so I have set exclusive to false (see below for code).

There are also other services that listen for events like

In addition there are services that listen for more general events like

and so on.

So I am using a topic exchange.

Setup

| exchange | type  | routingKey     | queueName      |
| -------- | ----- | -------------- | -------------  |
| MY_APP   | topic | user.collect   | COLLECT_USER   |
| MY_APP   | topic | user.collected | COLLECTED_USER |

What should happen:

  1. Manager publishes message with routingKey user.collect
  2. Collector gets the user.collect message and invokes a CollectUser handler
  3. Collector's CollectUser handler does work, then publishes a message with routingKey user.collected
  4. Manager gets the user.collected message and invokes the UserCollected handler

What actually happens:

  1. Manager publishes message with routingKey user.collect (correct)
  2. Collector gets the user.collect message and invokes a CollectUser handler (correct)
  3. Manager also gets the user.collect message and invokes the UserCollected handler with the wrong data. (wrong)
  4. Collector's CollectUser handler does work, then publishes a message with routingKey user.collected (correct)
  5. Manager gets the user.collected message and invokes the UserCollected handler (correct)

My Question

Why does the Manager get the user.collect message, given:

  1. It's listening on the COLLECTED_USER queue not the COLLECT_USER queue, and
  2. The Collector, which is listening on the COLLECT_USER queue, has already handled the message.

Implementation details

I create the subscribers and publishers as follows (trimmed for relevance)

Creating a Subscriber

given the AMQP url and params url, exchange, type, routingKey, queueName and handler

const connection = await amqp.connect(url)
const channel = await connection.createChannel()
channel.assertExchange(exchange, type, { durable: true })
const result = await channel.assertQueue(queueName, { exclusive: false })
channel.bindQueue(result.queue, exchange, routingKey)
channel.prefetch(1)
channel.consume(result.queue, handler)

Creating a Publisher

given the AMQP url and params url, exchange, and type

const connection = await amqp.connect(url)
const channel = await connection.createChannel()
await channel.assertExchange(exchange, type, { durable: true })

Publishing

given the channel and params exchange, routingKey, and message

await channel.publish(exchange, routingKey, message)

Note

This question is a follow-on from RabbitMQ — Why are my Routing Keys being ignored when using topic exchange .

Upvotes: 3

Views: 1176

Answers (1)

Dave Sag
Dave Sag

Reputation: 13486

I finally worked out what my problem was. A dirty exchange. While experimenting with this I'd inadvertently added an exchange that was routing messages to the wrong queue, and this was causing my confusion.

To fix it I fired up the RabbitMQ admin GUI and deleted all of the queues and let my code create the ones it needed. There was no issue with the code as outlined above.

Upvotes: 2

Related Questions