vikki
vikki

Reputation: 321

Spring cloud stream + spring retry, How to add recovery callback and disable logic that send to DLQ?

I'm using spring cloud stream + rabbit mq binder.

In my @StreaListener I want to apply retry logic on specific exceptions using RetryTemplate. After retries are exhausted or not retriable error is thrown, I would like to add a recovery callback that will save a new record with an error message to my Postgres DB and finish with the message (move to the next). Here what I got so far:

  @StreamListener(Sink.INPUT)
  public void saveUser(User user) {    
    User user = userService.saveUser(user); //could throw exceptions
    log.info(">>>>>>User is created successfully: {}", user);
  }

  @StreamRetryTemplate
  public RetryTemplate myRetryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());

    Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
    retryableExceptions.put(ConnectionException.class, true);
    retryTemplate.registerListener(new RetryListener() {
      @Override
      public <T, E extends Throwable> boolean open(RetryContext context,
        RetryCallback<T, E> callback) {
        return true;
      }

      @Override
      public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
        Throwable throwable) {
        //could add recovery logic here, like save error to db why sertain user was not saved
        log.info("retries exausted");
      }

      @Override
      public <T, E extends Throwable> void onError(RetryContext context,
        RetryCallback<T, E> callback, Throwable throwable) {
        log.error("Error on retry", throwable);
      }
    });

    retryTemplate.setRetryPolicy(
      new SimpleRetryPolicy(properties.getRetriesCount(), retryableExceptions, true));
    return retryTemplate;
  }

from properties, I only have these (no any dlq configuration)

spring.cloud.stream.bindings.input.destination = user-topic
spring.cloud.stream.bindings.input.group = user-consumer

And after retries are exhausted I get this log.

2020-06-01 20:05:58.674  INFO 18524 --- [idge-consumer-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:56722]
2020-06-01 20:05:58.685  INFO 18524 --- [idge-consumer-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory.publisher#319c51b0:0/SimpleConnection@2a060201 [delegate=amqp://[email protected]:56722/, localPort= 50728]
2020-06-01 20:05:58.697  INFO 18524 --- [idge-consumer-1] c.e.i.o.b.c.RetryConfiguration           : retry finish
2020-06-01 20:05:58.702 ERROR 18524 --- [127.0.0.1:56722] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DLX' in vhost '/', class-id=60, method-id=40)

After RetryListener close method triggered, I can see that listener tries to connect to DLX probably to publish an error message. And I don't want it to do that as well as observe this error message in the log each time.

So my questions are:

1) Where to add RecoveryCalback for my retryTemplate? Supposedly I could write my recover logic with saving error to db in RetryListener#close method, but there definetely should be more appropriate way to do that.

2) How to configure rabbit-mq binder not to send messages to DLQ, maybe I could override some method? Currently, after retries are exhausted (or not retriable error is coming) listener tries to send a message to DLX and logs error that couldn't find it. I don't need any messages to be sent to dlq in scope of my application, I only need to save it to DB.

Upvotes: 1

Views: 1244

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

There is currently no mechanism to provision a custom recovery callback.

Set republishToDlq to false (it used to be). It was changed to true, which is wrong if autoBindDlq is false (default); I will open an issue for that.

Then, when retries are exhausted, the exception will be thrown back to the container; you can use a ListenerContainerCustomizer to add a custom ErrorHandler.

However, the data you get there will be a ListenerExecutionFailed exception with the raw (unconverted) Spring AMQP Message in its failedMessage property, not your User object.

EDIT

You can add a listener to the binding's error channel...

@SpringBootApplication
@EnableBinding(Sink.class)
public class So62137618Application {

    public static void main(String[] args) {
        SpringApplication.run(So62137618Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {
        System.out.println(in);
        throw new RuntimeException("test");
    }

    @ServiceActivator(inputChannel = "user-topic.user-consumer.errors")
    public void errors(String in) {
        System.out.println("Retries exhausted for " + new String((byte[]) in.getFailedMessage().getPayload()));
    }

}

Upvotes: 1

Related Questions