lambodar
lambodar

Reputation: 3783

Put unacknowledged message(on exception) to a different queue in RabbitMQ

On exception while processing the message from RabbitMQ, i just wanted to unacknowledg and put back the particular message to a difference queue instead or re-queue to same queue or completely discard the message(as per the last Boolean flag@requeue in basicNack).

The whole idea is later on i can get the count of unack message and check the message format etc instead of re-queue again and again to same channel and also i want to sent unacknowledged signal to current channel.

FYI I set the channel ack mode as manual(i.e container.setAcknowledgeMode(AcknowledgeMode.MANUAL);)

This is what i am doing now.

public class My***Listener implements ChannelAwareMessageListener{

try{

    @Override
    public void onMessage(Message message,Channel channel) throws Exception {   
    String s = new String(message.getBody());
    //some logic
    //after successful ack manually
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
catch(Exception e){
      //currently on exception i am unack the channel
      channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}

Any help is highly appreciable.

Upvotes: 2

Views: 4456

Answers (3)

Gary Russell
Gary Russell

Reputation: 174759

If you prefer to use declarations, see this answer for an example.

To route messages to the DLX, you can set defaultRequeuRejected to false (on the listener container). Or you can throw an AmqpRejectAndDontRequeueException to tell the container you want this message to be rejected (and not requeued).

The container's default behavior is to requeue rejected messages.

You can use a retry interceptor with a RejectAndDontRequeueRecoverer to automatically throw the exception; or, as @Jawo99 says, you can use a republish recoverer - this has the added benefit of adding the stack trace as a header. The DLX just routes the original message.

Upvotes: 1

Jaiwo99
Jaiwo99

Reputation: 10017

You need something like this:

@Bean
RetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateless()
            .withMaxAttempts(5)
            .setRecoverer(new RepublishMessageRecoverer(amqpTemplate(), "bar", "baz"))
            .build();
}

Notice: It works only with spring-amqp 1.3+ see also the Reference

Upvotes: 3

Enno Shioji
Enno Shioji

Reputation: 26882

You can send them to a dead letter queue. It's a pretty standard pattern.

https://www.rabbitmq.com/dlx.html

Upvotes: 4

Related Questions