Patrik Mihalčin
Patrik Mihalčin

Reputation: 3961

Selectively consume messages based on message body attributes in RabbitMQ

Let's say I have a situation where I need to wait for up to 1 minute for some action to be performed.

If it is expired, then try different action.

My current solution proposal is based on RabbitMQ features.

I would create following resources:

@Bean
DirectExchange exchangeDirect() {
    return new DirectExchange("exchange.direct");
}

@Bean
Queue bufferQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", amqpProperties.getTimeToLive().toMillis());
    args.put("x-dead-letter-exchange", "exchange.direct");
    args.put("x-dead-letter-routing-key", "timedOutQueue");
    return new Queue("buffer.queue", true, false, false, args);
}

@Bean
Queue timedOutQueue() {
    return new Queue("timed.out.queue", true);
}

@Bean
Binding bufferQueueToExchangeDirect() {
    return bind(bufferQueue())
            .to(exchangeDirect())
            .with("buffer.queue");
}

@Bean
Binding timedOutQueueToExchangeDirect() {
    return bind(timedOutQueue())
            .to(exchangeDirect())
            .with("timed.out.queue");
}

When I add action to bufferQueue and I don't receive any delivery update within 1 minute, this request is then moved to timedOutQueue thanks to bufferQueue's TTL.

I can attach application rabbit listener to timedOutQueue and use different action.

When I add action to bufferQueue and I receive confirmation that action was successfully performed, I'd like to remove given action event from bufferQueue.

I couldn't find such feature in RabbitMQ, i.e. being able to receive selectively.

I also found some articles saying that selective consuming is antipattern.

Is it possible to selectively consume messages from RabbitMQ queue?

What is proper way to implement this pattern in RabbitMQ?

Upvotes: 0

Views: 922

Answers (1)

Gary Russell
Gary Russell

Reputation: 174484

There is no concept of message selection in RabbitMQ.

The "proper" way for an application that wants to selectively receive messages is to use multiple queues/routing keys with a consumer on each specific queue he expresses interest in.

However, there is no way to "remove" a message from the middle of a queue; only the head.

When I add action to bufferQueue and I receive confirmation that action was successfully performed, I'd like to remove given action event from bufferQueue.

That makes no sense to me; when the message timed out in bufferQueue due to TTL, and was moved to timedOutQueue, it no longer exists in bufferQueue so there is nothing to remove.

There is also no mechanism to ...

and I don't receive any delivery update within 1 minute,

... because each message in a queue is independent.

It doesn't sound like your application is suitable for a message broker at all.

Upvotes: 2

Related Questions