Nikolai  Shevchenko
Nikolai Shevchenko

Reputation: 7531

How to intercept message republished to DLQ in Spring Cloud RabbitMQ?

I want to intercept messages that are republished to DLQ after retry limit is exhausted, and my ultimate goal is to eliminate x-exception-stacktrace header from those messages.

Config:

spring:
  application:
    name: sandbox
  cloud:
    function:
      definition: rabbitTest1Input
    stream:
      binders:
        rabbitTestBinder1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                addresses: localhost:55015
                username: guest
                password: guest
                virtual-host: test
 
      bindings:
        rabbitTest1Input-in-0:
          binder: rabbitTestBinder1
          consumer:
            max-attempts: 3
          destination: ex1
          group: q1
      rabbit:
        bindings:
          rabbitTest1Input-in-0:
            consumer:
              autoBindDlq: true
              bind-queue: true
              binding-routing-key: q1key
              deadLetterExchange: ex1-DLX
              dlqDeadLetterExchange: ex1
              dlqDeadLetterRoutingKey: q1key_dlq
              dlqTtl: 180000
              prefetch: 5
              queue-name-group-only: true
              republishToDlq: true
              requeueRejected: false
              ttl: 86400000
@Configuration
class ConsumerConfig {

    companion object : KLogging()

    @Bean
    fun rabbitTest1Input(): Consumer<Message<String>> {
        return Consumer {
            logger.info("Received from test1 queue: ${it.payload}")
            throw AmqpRejectAndDontRequeueException("FAILED")  // force republishing to DLQ after N retries
        }
    }
}

First I tried to register @GlobalChannelInterceptor (like here), but since RabbitMessageChannelBinder uses its own private RabbitTemplate instance (not autowired) for republishing (see #getErrorMessageHandler) it doesn't get intercepted.

Then I tried to extend RabbitMessageChannelBinder class by throwing away the code related to x-exception-stacktrace and then declare this extension as a bean:

/**
 * Forked from {@link org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder} with the goal
 * to eliminate {@link RepublishMessageRecoverer.X_EXCEPTION_STACKTRACE} header from messages republished to DLQ
 */
class RabbitMessageChannelBinderWithNoStacktraceRepublished 
    : RabbitMessageChannelBinder(...)

// and then

@Configuration
@Import(
    RabbitAutoConfiguration::class,
    RabbitServiceAutoConfiguration::class,
    RabbitMessageChannelBinderConfiguration::class,
    PropertyPlaceholderAutoConfiguration::class,
)
@EnableConfigurationProperties(
    RabbitProperties::class,
    RabbitBinderConfigurationProperties::class,
    RabbitExtendedBindingProperties::class
)
class RabbitConfig {

    @Bean
    @Primary
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    @Order(Ordered.HIGHEST_PRECEDENCE)
    fun customRabbitMessageChannelBinder(
        appCtx: ConfigurableApplicationContext,
        ... // required injections
    ): RabbitMessageChannelBinder {

        // remove the original (auto-configured) bean. Explanation is after the code snippet
        val registry = appCtx.autowireCapableBeanFactory as BeanDefinitionRegistry
        registry.removeBeanDefinition("rabbitMessageChannelBinder")

        // ... and replace it with custom binder. It's initialized absolutely the same way as original bean, but is of forked class
        return RabbitMessageChannelBinderWithNoStacktraceRepublished(...)
    }
}

But in this case my channel binder doesn't respect the YAML properties (e.g. addresses: localhost:55015) and uses default values (e.g. localhost:5672)

INFO  o.s.a.r.c.CachingConnectionFactory - Attempting to connect to: [localhost:5672]
INFO  o.s.a.r.l.SimpleMessageListenerContainer - Broker not available; cannot force queue declarations during start: java.net.ConnectException: Connection refused

On the other hand if I don't remove original binder from Spring context I get following error:

Caused by: java.lang.IllegalStateException: Multiple binders are available, however neither default nor per-destination binder name is provided. Available binders are [rabbitMessageChannelBinder, customRabbitMessageChannelBinder]
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:145)

Could anyone give me a hint how to solve this problem?

P.S. I use Spring Cloud Stream 3.1.6 and Spring Boot 2.6.6

Upvotes: 1

Views: 985

Answers (1)

Gary Russell
Gary Russell

Reputation: 174664

  1. Disable the binder retry/DLQ configuration (maxAttempts=1, republishToDlq=false, and other dlq related properties).
  2. Add a ListenerContainerCustomizer to add a custom retry advice to the advice chain, with a customized dead letter publishing recoverer.
  3. Manually provision the DLQ using a Queue @Bean.
@SpringBootApplication
public class So72871662Application {

    public static void main(String[] args) {
        SpringApplication.run(So72871662Application.class, args);
    }

    @Bean
    public Consumer<String> input() {
        return str -> {
            System.out.println();
            throw new RuntimeException("test");
        };
    }

    @Bean
    ListenerContainerCustomizer<MessageListenerContainer> customizer(RetryOperationsInterceptor retry) {
        return (cont, dest, grp) -> {
            ((AbstractMessageListenerContainer) cont).setAdviceChain(retry);
        };
    }

    @Bean
    RetryOperationsInterceptor interceptor(MessageRecoverer recoverer) {
        return RetryInterceptorBuilder.stateless()
                .maxAttempts(3)
                .backOffOptions(3_000L, 2.0, 10_000L)
                .recoverer(recoverer)
                .build();
    }

    @Bean
    MessageRecoverer recoverer(RabbitTemplate template) {
        return new RepublishMessageRecoverer(template, "DLX", "errors") {

            @Override
            protected void doSend(@Nullable
            String exchange, String routingKey, Message message) {

                message.getMessageProperties().getHeaders().remove(RepublishMessageRecoverer.X_EXCEPTION_STACKTRACE);
                super.doSend(exchange, routingKey, message);
            }

        };
    }

    @Bean
    FanoutExchange dlx() {
        return new FanoutExchange("DLX");
    }

    @Bean
    Queue dlq() {
        return new Queue("errors");
    }

    @Bean
    Binding dlqb() {
        return BindingBuilder.bind(dlq()).to(dlx());
    }

}

Upvotes: 1

Related Questions