OrangeDog
OrangeDog

Reputation: 38777

How to handle a consumer exception as an ack?

I am trying to implement exponential backoff for consumer failures. To that end I have three queues with DLX thus: RETRY -> MAIN -> FAILED.

Anything rejected from MAIN goes FAILED, and anything added to RETRY goes into MAIN after a per-message TTL. The consumer receives from MAIN.

I've implemented an ErrorHandler and set it on the SimpleRabbitListenerContainerFactory. This handler either computes a new TTL and sends the message to the RETRY queue, or throws AmqpRejectAndDontRequeueException if that's not possible or retries are exceeded in order to DLX it to FAILED. The problem is, I cannot work out how to get rid of the original message.

As far as I can see I have to ack it, but the Channel is not available in the error handler, and there are no other exceptions to throw that would trigger an ack.

If instead I remove the MAIN -> FAILED DLX and switch to manually adding messages to FAILED, then if that doesn't work I've lost the message.

@Override
public void handleError(Throwable t) {
  log.warn("Execution of Rabbit message listener failed.", t);

  try {
    queueForExponentialRetry(((ListenerExecutionFailedException) t).getFailedMessage());
    // what to do here?
  } catch (RuntimeException ex) {
    t.addSuppressed(ex);
    log.error("Not requeueing after failure", t);
    throw new AmqpRejectAndDontRequeueException(t);
  }
  // or here?
}

Upvotes: 1

Views: 896

Answers (1)

OrangeDog
OrangeDog

Reputation: 38777

I think I immediately found the answer. Missed it before because I was throwing from the the wrong place.

@Override
public void handleError(Throwable t) {
  log.warn("Execution of Rabbit message listener failed.", t);

  try {
    queueForExponentialRetry(((ListenerExecutionFailedException) t).getFailedMessage());
  } catch (RuntimeException ex) {
    t.addSuppressed(ex);
    log.error("Not requeueing after failure", t);
    throw new AmqpRejectAndDontRequeueException(t);
  }

  throw new ImmediateAcknowledgeAmqpException("Queued for retry");
}

ImmediateAcknowledgeAmqpException

Special exception for listener implementations that want to signal that the current batch of messages should be acknowledged immediately (i.e. as soon as possible) without rollback, and without consuming any more messages within the current transaction.

This should be safe as I'm not using batches or transactions, only publisher returns.


Side note: I should also be aware that exponential backoff isn't going to actually work properly:

While consumers never see expired messages, only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered). When setting a per-queue TTL this is not a problem, since expired messages are always at the head of the queue. When setting per-message TTL however, expired messages can queue up behind non-expired ones until the latter are consumed or expired.

Upvotes: 1

Related Questions