Reputation: 38777
I am trying to implement exponential backoff for consumer failures. To that end I have three queues with DLX thus: RETRY -> MAIN -> FAILED
.
Anything rejected from MAIN
goes FAILED
, and anything added to RETRY
goes into MAIN
after a per-message TTL. The consumer receives from MAIN
.
I've implemented an ErrorHandler
and set it on the SimpleRabbitListenerContainerFactory
. This handler either computes a new TTL and sends the message to the RETRY
queue, or throws AmqpRejectAndDontRequeueException
if that's not possible or retries are exceeded in order to DLX it to FAILED
. The problem is, I cannot work out how to get rid of the original message.
As far as I can see I have to ack it, but the Channel
is not available in the error handler, and there are no other exceptions to throw that would trigger an ack.
If instead I remove the MAIN -> FAILED
DLX and switch to manually adding messages to FAILED
, then if that doesn't work I've lost the message.
@Override
public void handleError(Throwable t) {
log.warn("Execution of Rabbit message listener failed.", t);
try {
queueForExponentialRetry(((ListenerExecutionFailedException) t).getFailedMessage());
// what to do here?
} catch (RuntimeException ex) {
t.addSuppressed(ex);
log.error("Not requeueing after failure", t);
throw new AmqpRejectAndDontRequeueException(t);
}
// or here?
}
Upvotes: 1
Views: 896
Reputation: 38777
I think I immediately found the answer. Missed it before because I was throwing from the the wrong place.
@Override
public void handleError(Throwable t) {
log.warn("Execution of Rabbit message listener failed.", t);
try {
queueForExponentialRetry(((ListenerExecutionFailedException) t).getFailedMessage());
} catch (RuntimeException ex) {
t.addSuppressed(ex);
log.error("Not requeueing after failure", t);
throw new AmqpRejectAndDontRequeueException(t);
}
throw new ImmediateAcknowledgeAmqpException("Queued for retry");
}
ImmediateAcknowledgeAmqpException
Special exception for listener implementations that want to signal that the current batch of messages should be acknowledged immediately (i.e. as soon as possible) without rollback, and without consuming any more messages within the current transaction.
This should be safe as I'm not using batches or transactions, only publisher returns.
Side note: I should also be aware that exponential backoff isn't going to actually work properly:
While consumers never see expired messages, only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered). When setting a per-queue TTL this is not a problem, since expired messages are always at the head of the queue. When setting per-message TTL however, expired messages can queue up behind non-expired ones until the latter are consumed or expired.
Upvotes: 1