christophetd
christophetd

Reputation: 3874

Making sure a message published on a topic exchange is received by at least one consumer

TLDR; In the context of a topic exchange and queues created on the fly by the consumers, how to have a message redelivered / the producer notified when no consumer consumes the message?

I have the following components:

I currently have a single RabbitMQ topic exchange.

Now - this works properly, but it still has a major issue. Currently, if the publisher sends a message with a routing key that no consumer is bound to, the message will be lost. This is because even if the queue created by the consumers is durable, it is destroyed as soon as the consumer disconnects since it is unique to this consumer.

Consumer code (python):

channel.exchange_declare(exchange=exchange_name, type='topic', durable=True)
result = channel.queue_declare(exclusive = True, durable=True)
queue_name = result.method.queue

topics = [ "pictures.*", "videos.trending" ]
for topic in topics:
    channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic)

channel.basic_consume(my_handler, queue=queue_name)
channel.start_consuming()

Loosing a message in this condition is not acceptable in my use case.

Attempted solution

However, "loosing" a message becomes acceptable if the producer is notified that no consumer received the message (in this case it can just resend it later). I figured out the mandatory field could help, since the specification of AMQP states:

This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a Return method.

This is indeed working - in the producer, I am able to register a ReturnListener :

rabbitMq.confirmSelect();  

rabbitMq.addReturnListener( (int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) -> {
    log.info("A message was returned by the broker");
});
rabbitMq.basicPublish(exchangeName, "pictures.profile", true /* mandatory */, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBytes);

This will as expected print A message was returned by the broker if a message is sent with a routing key no consumer is bound to.

Now, I also want to know when the message was correctly received by a consumer. So I tried registering a ConfirmListener as well:

rabbitMq.addConfirmListener(new ConfirmListener() {
    void handleAck(long deliveryTag, boolean multiple) throws IOException {
        log.info("ACK message {}, multiple = ", deliveryTag, multiple);
    }

    void handleNack(long deliveryTag, boolean multiple) throws IOException {
        log.info("NACK message {}, multiple = ", deliveryTag, multiple);
    }
});

The issue here is that the ACK is sent by the broker, not by the consumer itself. So when the producer sends a message with a routing key K:

Cf the docs:

For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack. The same is true for negative acknowledgements (basic.nack).

So while my problem is theoretically solvable using this, it would make the logic of knowing if a message was correctly consumed very complicated (especially in the context of multi threading, persistence in a database, etc.):

send a message

on receive ACK:
    if no basic.return was received for this message
       the message was correctly consumed
    else
       the message wasn't correctly consumed

on receive basic.return
    the message wasn't correctly consumed

Possible other solutions


I would love to have some inputs on that - thanks!


Related to:

[edit] Solution

I ended up implementing something that uses a basic.return, as mentioned earlier. It is actually not so tricky to implement, you just have to make sure that your method producing the messages and the method handling the basic returns are synchronized (or have a shared lock if not in the same class), otherwise you can end up with interleaved execution flows that will mess up your business logic.

Upvotes: 8

Views: 3192

Answers (1)

Olivier
Olivier

Reputation: 2691

I believe that an alternate exchange would be the best fit for your use case for the part regarding the identification of not routed messages.

Whenever an exchange with a configured AE cannot route a message to any queue, it publishes the message to the specified AE instead.

Basically upon creation of the "main" exchange, you configure an alternate exchange for it. For the referenced alternate exchange, I tend to go with a fanout, then create a queue (notroutedq) binded to it. This means any message that is not published to at least one of the queues bound to your "main" exchange will end up in the notroutedq

Now regarding your statement:

because even if the queue created by the consumers is durable, it is destroyed as soon as the consumer disconnects since it is unique to this consumer.

Seems that you have configured your queues with auto-delete set to true. If so, in case of disconnect, as you stated, the queue is destroyed and the messages still present on the queue are lost, case not covered by the alternate exchange configuration.

It's not clear from your use case description whether you'd expect in some cases for a message to end up in more than one queue, seemed more a case of one queue per type of processing expected (while keeping the grouping flexible). If indeed the queue split is related to type of processing, I do not see the benefit of setting the queue with auto-delete, expect maybe not having to do any cleanup maintenance when you want to change the bindings.

Assuming you can go with durable queues, then a dead letter exchange (would again go with fanout) with a binding to a dlq would cover the missing cases.

  • not routed covered by alternate exchange
  • correct processing already handled by your processing_result queue
  • problematic processing or too long to be processed covered by the dead letter exchange, in which case the additional headers added upon dead lettering the message can even help to identify the type of actions to take

Upvotes: 2

Related Questions