Pith
Pith

Reputation: 3794

Camel: stop the route when the jdbc connection loss is detected

I have an application with the following route:

from("netty:tcp://localhost:5150?sync=false&keepAlive=true")
.routeId("tcp.input")
.transform()
.simple("insert into tamponems (AVIS) values (\"${in.body}\");")
.to("jdbc:mydb");

This route receives a new message every 59 millisecondes. I want to stop the route when the connection to the database is lost, before that a second message arrives. And mainly, I want to never lose a message.

I proceeded that way:

I added an errorHandler:

errorHandler(deadLetterChannel("direct:backup")
.redeliveryDelay(5L)
.maximumRedeliveries(1)
.retryAttemptedLogLevel(LoggingLevel.WARN)
.logExhausted(false));

My errorHandler tries to redeliver the message and if it fails again, it redirects the message to a deadLetterChannel.

The following deadLetterChannel will stop the tcp.input route and try to redeliver the message to the database:

RoutePolicy policy = new StopRoutePolicy();
from("direct:backup")
.routePolicy(policy)
.errorHandler(
  defaultErrorHandler()
  .redeliveryDelay(1000L)
  .maximumRedeliveries(-1)
  .retryAttemptedLogLevel(LoggingLevel.ERROR)
)
.to("jdbc:mydb");

Here is the code of the routePolicy:

public class StopRoutePolicy extends RoutePolicySupport {

  private static final Logger LOG = LoggerFactory.getLogger(String.class);

  @Override
  public void onExchangeDone(Route route, Exchange exchange) {
    String stop = "tcp.input";
    CamelContext context = exchange.getContext();
    if (context.getRouteStatus(stop) != null && context.getRouteStatus(stop).isStarted()) {
      try {
        exchange.getContext().getInflightRepository().remove(exchange);
        LOG.info("STOP ROUTE: {}", stop);
        context.stopRoute(stop);
      } catch (Exception e) {
        getExceptionHandler().handleException(e);
      }
    }
  }

}

My problems with this method are:

Please, does anybody have an idea for make this faster or for make this differently in order to not lose message?

Upvotes: 2

Views: 1540

Answers (1)

Pith
Pith

Reputation: 3794

I have finally found a way to resolve my problems. In order to make the application faster, I added asynchronous processes and multithreading with seda.

from("netty:tcp://localhost:5150?sync=false&keepAlive=true").to("seda:break");


from("seda:break").threads(5)
.routeId("tcp.input")
.transform()
.simple("insert into tamponems (AVIS) values (\"${in.body}\");")
.to("jdbc:mydb");

I did the same with the backup route.

from("seda:backup")
.routePolicy(policy)
.errorHandler(
  defaultErrorHandler()
  .redeliveryDelay(1000L)
  .maximumRedeliveries(-1)
  .retryAttemptedLogLevel(LoggingLevel.ERROR)
).threads(2).to("jdbc:mydb");

And I modified the routePolicy like that:

public class StopRoutePolicy extends RoutePolicySupport {

  private static final Logger LOG = LoggerFactory.getLogger(String.class);

  @Override
  public void onExchangeBegin(Route route, Exchange exchange) {
    String stop = "tcp.input";
    CamelContext context = exchange.getContext();
    if (context.getRouteStatus(stop) != null && context.getRouteStatus(stop).isStarted()) {
      try {
        exchange.getContext().getInflightRepository().remove(exchange);
        LOG.info("STOP ROUTE: {}", stop);
        context.stopRoute(stop);
      } catch (Exception e) {
        getExceptionHandler().handleException(e);
      }
    }
  }

  @Override
  public void onExchangeDone(Route route, Exchange exchange) {
    String stop = "tcp.input";
    CamelContext context = exchange.getContext();
    if (context.getRouteStatus(stop) != null && context.getRouteStatus(stop).isStopped()) {
      try {
        LOG.info("RESTART ROUTE: {}", stop);
        context.startRoute(stop);
      } catch (Exception e) {
        getExceptionHandler().handleException(e);
      }
    }
  }
}

With these updates, the TCP route is stopped before the backup route is processed. And the route is restarted when the jdbc connection is back.

Now, thanks to Camel, the application is able to handle a database failure without losing message and without manual intervention.

I hope this could help you.

Upvotes: 3

Related Questions