Christoph
Christoph

Reputation: 2317

How to configure consumer-level transactional redelivery with Camel and IBM MQ

I am trying to accomplish a transactional JMS client in Java Spring Boot using Apache Camel, which connects to IBM MQ. Furthermore, the client needs to apply an exponential back-off redelivery behavior when processing of messages fails. Reason: Messages from MQ need to be processed and forwarded to external systems that may be down for maintenance for many hours. Using transactions to guarantee at-least once processing guarantees seems the appropriate solution to me.

I have researched this topic for many hours and have not been able to find a solution. I will start with what I currently have:

  @Bean
  UserCredentialsConnectionFactoryAdapter uccConnectionFactoryAdapter ()
      throws IOException {
    MQConnectionFactory factory = new MQConnectionFactory();
    factory.setCCDTURL(tabFilePath);

    UserCredentialsConnectionFactoryAdapter adapter =
        new UserCredentialsConnectionFactoryAdapter();
    adapter.setTargetConnectionFactory(factory);
    adapter.setUsername(userName);
    bentechConnectionFactoryAdapter.setPassword(password);

    return adapter;
  }

  @Bean
  PlatformTransactionManager jmsTransactionManager(@Autowired UserCredentialsConnectionFactoryAdapter uccConnectionFactoryAdapter) {
    JmsTransactionManager txMgr = new JmsTransactionManager(uccConnectionFactoryAdapter);
    return txMgr;
  }

  @Bean()
  CamelContextConfiguration contextConfiguration(@Autowired UserCredentialsConnectionFactoryAdapter uccConnectionFactoryAdapter,
      @Qualifier("jmsTransactionManager") @Autowired PlatformTransactionManager txMgr) {
    return new CamelContextConfiguration() {
      @Override
      public void beforeApplicationStart(CamelContext context) {
        JmsComponent jmsComponent = JmsComponent.jmsComponentTransacted(uccConnectionFactoryAdapter, txMgr);
        // required for consumer-level redelivery after rollback
        jmsComponent.setCacheLevelName("CACHE_CONSUMER");
        jmsComponent.setTransacted(true);
        jmsComponent.getConfiguration().setConcurrentConsumers(1);

        context.addComponent("jms", jmsComponent);
      }

      @Override
      public void afterApplicationStart(CamelContext camelContext) {
        // Do nothing
      }
    };
  }

// in a route builder
...
from("jms:topic:INPUT_TOPIC?clientId=" + CLIENT_ID + "&subscriptionDurable=true&durableSubscriptionName="+ SUBSCRIPTION_NAME)
    .transacted()
    .("direct:processMessage");
...

I was able to verify the transactional behavior through integration tests. If an unhandled exception occurs during message processing, the transaction gets rolled back and retried. The problem is, it gets immediately retried, several times per second, causing possibly significant load on the IBM MQ manager and external system.

For ActiveMQ, redelivery policies are easy to do, with plenty of examples on the net. The ActiveMQConnectionFactory has a setRedeliveryPolicy method, meaning, the ActiveMQ client library has redelivery logic built in. This from all I can tell in line with the documentation of Camel's Transactional Client EIP, which states:

The redelivery in transacted mode is not handled by Camel but by the backing system (the transaction manager). In such cases you should resort to the backing system how to configure the redelivery.

What I absolutely can't figure out is how to achieve the same thing for IBM MQ. IBM's MQConnectionFactory does not have any support for redelivery policies. In fact, searching for redeliverypolicy in the MQ Knowledge Center brings up exactly... drumroll... 0 hits. I even looked a bit through the implementation of the MQConnectionFactory and didn't discover anything either.

Another backing system I looked into was the JmsTransactionManager. Searches for "jmstransactionmanager redelivery policy" or "jmstransactionmanager exponential backoff" did not turn up anything useful either. There was some talk about TransactionTemplate and AbstractMessageListenerContainer but 1) I didn't see any connection to redelivery policies, and 2) I could not figure out how those interact with Camel and JMS.

Sooo, does anybody have any idea how to implement exponential backoff redelivery policies with Apache Camel and IBM MQ?

Closing note: Camel supports redelivery policies on errorHandler and onException are not the same as redelivery policies in the transaction/connection backing system. Those handlers retry at the point of failure using the 'Exchange' object in whichever state it is, without rolling back and reprocessing the message from the start of the route. The transaction remains active during entire rety period, and a rollback only occurs when the errorHandler or onException gives up. This is not what I want for retries that may go on for many hours.

Upvotes: 2

Views: 1549

Answers (1)

Christoph
Christoph

Reputation: 2317

Looks like @JoshMc pointed me in the right direction. I managed to implement a RoutePolicy that delays redeliveries with increasing delays. I have run a test session for a few hours and several thousand redeliveries of the same message to see if there are any problems like memory leak, MQ connection exhaustion or so. I did not observe any problems. There were two stable TCP connections to the MQ manager, and memory usage of the Java process moved within a close range.

import java.util.Timer;
import java.util.TimerTask;
import javax.jms.Session;
import lombok.extern.log4j.Log4j2;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Route;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.camel.support.RoutePolicySupport;

@Log4j2
public class ExponentialBackoffPolicy extends RoutePolicySupport implements CamelContextAware {
  final static String JMSX_DELIVERY_COUNT = "JMSXDeliveryCount";
  private CamelContext camelContext;

  @Override
  public void setCamelContext(CamelContext camelContext) {
    this.camelContext = camelContext;
  }

  @Override
  public CamelContext getCamelContext() {
    return this.camelContext;
  }

  @Override
  public void onExchangeDone(Route route, Exchange exchange) {
    try {
      // ideally we would check if the exchange is transacted but onExchangeDone is called after the
      // transaction is already rolled back, and the transaction context has already been removed.
      if (exchange.getException() == null)
      {
        log.debug("No exception occurred, skipping route suspension.");
        return;
      }

      int deliveryCount = getRetryCount(exchange);
      int redeliveryDelay = getRedeliveryDelay(deliveryCount);
      log.info("Suspending route {} for {}ms after exception. Current delivery count {}.",
          route.getId(), redeliveryDelay, deliveryCount);

      super.suspendRoute(route);
      scheduleWakeup(route, redeliveryDelay);
    } catch (Exception ex) {
      // only log exception and let Camel continue as of this policy didn't exist.
      log.error("Exception while suspending route", ex);
    }
  }

  void scheduleWakeup(Route route, int redeliveryDelay) {
    Timer timer = new Timer();
    timer.schedule(
        new TimerTask() {
          @Override
          public void run() {
            log.info("Resuming route {} after redelivery delay of {}ms.", route.getId(), redeliveryDelay);
            try {
              resumeRoute(route);
            } catch (Exception ex) {
              // only log exception and let Camel continue as of this policy didn't exist.
              log.error("Exception while resuming route", ex);
            }
            timer.cancel();
          }
        },
        redeliveryDelay);
  }

  int getRetryCount(Exchange exchange) {
    Message msg = exchange.getIn();
    return (int) msg.getHeader(JMSX_DELIVERY_COUNT, 1);
  }

  int getRedeliveryDelay(int deliveryCount) {
    // very crude backoff strategy for now, will need to refine later
    if (deliveryCount < 10) return 1000;
    if (deliveryCount < 20) return 5000;
    if (deliveryCount < 30) return 20000;
    return 60000;
  }
}

And this is how it being used in route definitions:

    from(mqConnectionString)
        .routePolicy(new ExponentialBackoffPolicy())
        .transacted()
        ...

    // and if you want to distinguish between retriable and non-retriable situations, apply the following two exception handlers
    onException(NonRetriableProcessingException.class)
        .handled(true)
        .log(LoggingLevel.WARN, "Non-retriable exception occurred, discard message.");

    onException(Exception.class)
        .handled(false)
        .log(LoggingLevel.WARN, "Retriable exception occurred, retry message.");

One thing to note is that the JMSXDeliveryCount header comes from the MQ manager, and the redelivery delay is calculated from that. When you restart an application using the ExponentialBackoff policy while a message permanently fails, upon restart it will immediately attempt to reprocess that message but in case of another failure apply a delay corresponding to the total number of redeliveries, and not start over with the initial short delay.

Upvotes: 1

Related Questions