user11734281
user11734281

Reputation: 11

How do I retry failed messages forever in Camel?

I have a scenario where my camel route listens to KAFKA notification messages, forwards them to a logging service which is queue based and then to a REST endpoint. In case of failure for example, if the message could be delivered to the REST endpoint, then the message should be retried infinitely till the endpoint is up. Here i want the logging to be disabled. For e.g in case the exchange is delivered again, then the message should not be route to the to endpoint of the logging service. Also, i have put up a code of the onException block. In the onException block, the error message should only be logged for the first time. How can i achieve this ?

from(<<kafka endpoint>>)
                .process(exchange -> {

                    //set exchange headers and message properties
                    exchange.setProperty("EndpointURI", exchange.getFromEndpoint().getEndpointUri());
                    String[][] messageProperties = setHeadersAsMessageProperties(exchange);
                
                    //Generate xml message and set it to the body of the exchange
                    String xmlPayload = log.generateLoggingMessage(trackingID, exchange.getIn().getBody(String.class), fromComponentName, fromEndpoint, serviceName, operation, process, "", user, false, "", messageProperties);
                    exchange.getIn().setBody(xmlPayload);

                })
                .removeHeaders("kafka*")
                .removeHeaders("camel*")

                //Send the exchange to the logging queue
                .to(<<jms endpoint>>)
                .process(exchange -> {
                    //set exchange headers and message properties
                    String xmlPayload = log.generateLoggingMessage(exchange.getIn().getHeader("ESB_TrackingId", String.class), exchange.getProperty("InitialBody", String.class), toComponentName, toEndpoint, serviceName, operation, process, "", user, false, "", messageProperties);
                    exchange.getIn().setBody(xmlPayload);

                })
                //Send the exchange to the logging queue
                 .to(<<jms endpoint>>)
                .removeHeaders("cameljms*")
                .removeHeaders("camel*")
                .process(exchange -> {
                    String originalBody = exchange.getProperty("InitialBody", String.class);
                    exchange.getIn().setBody(originalBody);
                })
                .to(<<rest endpoint>>)

    

Upvotes: 0

Views: 114

Answers (1)

You should create a redelivery policy, with number of retries set to -1.

protected RedeliveryPolicyDefinition createRedeliveryPolicy(String maximumRedeliveries,
                                                            String redeliveryDelay) {
    RedeliveryPolicyDefinition redeliveryPolicy = new RedeliveryPolicyDefinition();
    redeliveryPolicy.setMaximumRedeliveries(maximumRedeliveries);
    redeliveryPolicy.setRedeliveryDelay(redeliveryDelay);
    redeliveryPolicy.retryAttemptedLogLevel(LoggingLevel.WARN);
    redeliveryPolicy.logRetryStackTrace(false);
    return redeliveryPolicy;
}

Where the parameter maximumRedeliveries is set to -1.

An example would be :

    route
                .onException(Exception.class)
                .handled(true)
                .process(this::logException)
                .onRedelivery(this::logAttemptRedelivery)
     .setRedeliveryPolicyType(
createRedeliveryPolicy(maxRedeliveryRetries, redeliveryDelay)
);

Upvotes: 1

Related Questions