enkor
enkor

Reputation: 7557

Using a timing based PollingConsumer to a direct endpoint

Functionally I wish to check a URL is active before I consume from a JMS (WMQ) endpoint.
If the URL cannot be reached or a server error, then I do not want to pick up from the queue. So I want to keep trying (with unlimited retries) the URL via a polling consumer. So as soon as it is available I can pick up from JMS.

I have a RouteBuilder that is set up with a direct endpoint, that is configured to run a Processor that will ping a service.

So:

public class PingRoute extends RouteBuilder {
        @Override
        public void configureCamel() {
            from("direct:pingRoute").routeId(PingRoute.class.getSimpleName())
                .process(new PingProcessor(url))
                .to("log://PingRoute?showAll=true");
        }
    }

In another route I am setting up my timer:

    @Override
public void configureCamel() {
       from(timerEndpoint).beanRef(PollingConsumerBean.class.getSimpleName(), "checkPingRoute");
   ...
    }

And with the PollingConsumerBean I am attempting to receive the body via a consumer:

public void checkPingRoute(){
    // loop to check the consumer.  Check we can carry on with the pick up from the JMS queue.
    while(true){
        Boolean pingAvailable = consumer.receiveBody("direct:pingRoute", Boolean.class);
   ...
    }

I add the route to the context and use a producer to send:

context.addRoutes(new PingRoute());
context.start();
producer.sendBody(TimerPollingRoute.TIMER_POLLING_ROUTE_ENDPOINT, "a body");

And I get the following IllegalArgumentException:

Cannot add a 2nd consumer to the same endpoint. Endpoint Endpoint[direct://pingRoute] only allows one consumer.

Is there a way to setup the direct route as a polling consumer?

Upvotes: 2

Views: 1705

Answers (4)

enkor
enkor

Reputation: 7557

All the answers here pointed me on the right direction but I finally came up with a solution that managed to fit our code base and framework.

Firstly, I discovered there isn't a need to have bean to act as a polling consumer but a processor could be used instead.

@Override
public void configureCamel() {
    from("timer://fnzPoller?period=2000&delay=2000").processRef(UrlPingProcessor.class.getSimpleName())
           .processRef(StopStartProcessor.class.getSimpleName()).to("log://TimerPollingRoute?showAll=true");

}

Then in the UrlPingProcessor there is CXF service to ping the url and can check the response :

@Override
public void process(Exchange exchange) {
    try {
        // CXF service
        FnzPingServiceImpl fnzPingService = new FnzPingServiceImpl(url);
        fnzPingService.getPing();
    } catch (WebApplicationException e) {
        int responseCode = e.getResponse().getStatus();
        boolean isValidResponseCode = ResponseCodeUtil.isResponseCodeValid(responseCode);
        if (!isValidResponseCode) {
            // Sets a flag to stop for the StopStartProcessor
            stopRoute(exchange);
        }
    } 
}

Then in the StopStartProcessor it is using a ExecutorService to stop or start a route via new thread.:

    @Override
public void process(final Exchange exchange) {
    // routeBuilder is set on the constructor.
    final String routeId = routeBuilder.getClass().getSimpleName();
    Boolean stopRoute = ExchangeHeaderUtil.getHeader(exchange, Exchange.ROUTE_STOP, Boolean.class);
    boolean stopRoutePrim = BooleanUtils.isTrue(stopRoute);
    if (stopRoutePrim) {
        StopRouteThread stopRouteThread = new StopRouteThread(exchange, routeId);
        executorService.execute(stopRouteThread);
    } else {
        CamelContext context = exchange.getContext();
        Route route = context.getRoute(routeId);
        if (route == null) {
            try {
                context.addRoutes(routeBuilder);
            } catch (Exception e) {
                String msg = "Unable to add a route: " + routeBuilder;
                LOGGER.warn(msg, e);
            }
        }
    }
}

Upvotes: 0

raulk
raulk

Reputation: 2859

Based on the OP's clarification of their use case, they have several problems to solve:

  • Consume the message from the JMS queue if, and only if, the ping to the URL is positive.
  • If the URL is unresponsive, the JMS message should not disappear from the queue and a retry must take place until the URL becomes responsive again, in which case the message will be ultimately consumed.
  • The OP has not specified if the amount of retries is limited or unlimited.

Based on this problem scenario, I suggest a redesign of their solution that leverages ActiveMQ retries, broker-side redelivery and JMS transactions in Camel to:

  1. Return the message to the queue if the URL ping failed (via a transaction rollback).
  2. Ensure that the message is not lost (by using JMS persistence and broker-side redeliveries, AMQ will durably schedule the retry cycle).
  3. Be able to specify a sophisticated retry cycle per message, e.g. with exponential backoffs, maximum retries, etc.
  4. Optionally sending the message to a Dead Letter Queue if the retry cycle was exhausted without a positive result, so that some other (possibly manual) action can be planned.

Now, implementation-wise:

from("activemq:queue:abc?transacted=true")          // (1)
    .to("http4://host.endpoint.com/foo?method=GET") // (2) (3)
    .process(new HandleSuccess());                  // (4)

Comments:

  1. Note the transacted flag.
  2. If the HTTP invocation fails, the HTTP4 endpoint will raise an Exception.
  3. Since there are no configured exception handlers, Camel will propagate the exception to the consumer endpoint (activemq) which will rollback the transaction.
  4. If the invocation succeeded, the flow will continue and the exchange body will now contain the payload returned by the HTTP server and you can handle it in whichever way you wish. Here I'm using a processor.

Next, what's important is that you configure the redelivery policy in ActiveMQ, as well as enable broker-side redeliveries. You do that in your activemq.xml configuration file:

<plugins>
  <redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true">
    <redeliveryPolicyMap>
      <redeliveryPolicyMap>
        <redeliveryPolicyEntries>
          <redeliveryPolicy queue="my.queue" 
                            initialRedeliveryDelay="30000" 
                            maximumRedeliveries="17" 
                            maximumRedeliveryDelay="259200000" 
                            redeliveryDelay="30000" 
                            useExponentialBackOff="true"
                            backOffMultiplier="2" />
        </redeliveryPolicyEntries>
      </redeliveryPolicyMap>
    </redeliveryPolicyMap>
  </redeliveryPlugin>
</plugins>

And make sure that the scheduler support is enabled in the top-level <broker /> element:

<broker xmlns="http://activemq.apache.org/schema/core" 
        brokerName="mybroker" 
        schedulerSupport="true">
    ...
</broker>

I hope that helps.

EDIT 1: OP is using IBM WebSphere MQ as a broker, I missed that. You could use a JMS QueueBrowser to peek at messages and try their corresponding URLs before actually consuming a message, but it is not possible to selectively consume an individual message – that's not what MOM (messaging-oriented middleware) is about.

So I insist that you should explore JMS transactions, but rather than leaving it up to the broker to redeliver the message, you can start the pinging cycle to the URL within the TX body itself. With regards to Camel, you could implement it as follows:

from("jms:queue:myqueue?transacted=true")
    .bean(new UrlPinger());

UrlPinger.java:

public class UrlPinger {

    @EndpointInject
    private ProducerTemplate template;

    private Pattern pattern = Pattern.compile("^(http(?:s)?)\\:");

    @Handler
    public void pingUrl(@Body String url, CamelContext context) throws InterruptedException {
        // Replace http(s): with http(s)4: to use the Camel HTTP4 endpoint.
        Matcher m = pattern.matcher(url);
        if (m.matches()) {
            url = m.replaceFirst(m.group(1) + "4:");
        }

        // Try forever until the status code is 200.
        while (getStatusCode(url, context) != 200) {
            Thread.sleep(5000);
        }
    }

    private int getStatusCode(String url, CamelContext context) {
        Exchange response = template.request(url + "?method=GET&throwExceptionOnFailure=false", new Processor() {
            @Override public void process(Exchange exchange) throws Exception {
                // No body since this is a GET request.
                exchange.getIn().getBody(null);
            }
        });

        return response.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE, Integer.class);
    }

}

Notes:

  1. Note the throwExceptionOnFailure=false option. An Exception will not be raised, therefore the loop will execute until the condition is true.
  2. Inside the bean, I'm looping forever until the HTTP status is 200. Of course, your logic will be different.
  3. Between attempt and attempt, I'm sleeping 5000ms.
  4. I'm assuming the URL to ping is in the body of the incoming JMS message. I'm replacing the leading http(s): with http(s)4: in order to use the Camel HTTP4 endpoint.
  5. Performing the pinging inside the TX guarantees that the message will only be consumed once the ping condition is true (in this case HTTP status == 200).
  6. You might want to introduce a desist condition (you don't want to keep trying forever). Maybe introduce some backoff to not overwhelm the other party.
  7. If either Camel or the broker goes down within a retry cycle, the message will be automatically rolled back.
  8. Take into account that JMS transactions are Session-bound, so if you want to start many concurrent consumers (concurrentConsumers JMS endpoint option), you'll need to set cacheLevelName=CACHE_NONE for each thread to use a different JMS Session.

Upvotes: 1

Alexey Yakunin
Alexey Yakunin

Reputation: 1771

Business logic is not quite clear, unfortunately. As I understand it - you need to wait for a response from the service. IMHO you have to use Content Enricher EIP http://camel.apache.org/content-enricher.html . pollEnrich is what you need at timer route.

.pollEnrich("direct:waitForResponce", -1) or .pollEnrich("seda:waitForResponce", -1)

public class PingRoute extends RouteBuilder {
        @Override
        public void configureCamel() {
             from("direct:pingRoute").routeId(PingRoute.class.getSimpleName())
                .process(new PingProcessor(url))
             .choice().when(body())                       
                  .to("log://PingRoute?showAll=true")
                  .to("direct:waitForResponce") 
                .otherwise()
                  .to("direct:pingRoute")
                .end(); 
        }
};

timer:

    @Override
    public void configureCamel() {                           
      from(timerEndpoint)
      .inOnly("direct:pingRoute")
      .pollEnrich("direct:waitForResponce", -1)
       ...
    }

Upvotes: 1

Matthew Fontana
Matthew Fontana

Reputation: 3870

I am having a bit of difficulty figuring out exactly what you want to do, but it appears to me that you want to consume data from an endpoint on an interval. For this the best pattern is a polling consumer: http://camel.apache.org/polling-consumer.html

The error you are currently receiving is because you have two consumers both trying to read from the "direct://pingRoute" If this was intended you could change the direct to a seda://pingRoute so its an in memory queue your data will be in.

Upvotes: 0

Related Questions