Reputation: 11
I have a scenario where my camel route listens to KAFKA notification messages, forwards them to a logging service which is queue based and then to a REST endpoint. In case of failure for example, if the message could be delivered to the REST endpoint, then the message should be retried infinitely till the endpoint is up. Here i want the logging to be disabled. For e.g in case the exchange is delivered again, then the message should not be route to the to endpoint of the logging service. Also, i have put up a code of the onException block. In the onException block, the error message should only be logged for the first time. How can i achieve this ?
from(<<kafka endpoint>>)
.process(exchange -> {
//set exchange headers and message properties
exchange.setProperty("EndpointURI", exchange.getFromEndpoint().getEndpointUri());
String[][] messageProperties = setHeadersAsMessageProperties(exchange);
//Generate xml message and set it to the body of the exchange
String xmlPayload = log.generateLoggingMessage(trackingID, exchange.getIn().getBody(String.class), fromComponentName, fromEndpoint, serviceName, operation, process, "", user, false, "", messageProperties);
exchange.getIn().setBody(xmlPayload);
})
.removeHeaders("kafka*")
.removeHeaders("camel*")
//Send the exchange to the logging queue
.to(<<jms endpoint>>)
.process(exchange -> {
//set exchange headers and message properties
String xmlPayload = log.generateLoggingMessage(exchange.getIn().getHeader("ESB_TrackingId", String.class), exchange.getProperty("InitialBody", String.class), toComponentName, toEndpoint, serviceName, operation, process, "", user, false, "", messageProperties);
exchange.getIn().setBody(xmlPayload);
})
//Send the exchange to the logging queue
.to(<<jms endpoint>>)
.removeHeaders("cameljms*")
.removeHeaders("camel*")
.process(exchange -> {
String originalBody = exchange.getProperty("InitialBody", String.class);
exchange.getIn().setBody(originalBody);
})
.to(<<rest endpoint>>)
Upvotes: 0
Views: 114
Reputation: 628
You should create a redelivery policy, with number of retries set to -1.
protected RedeliveryPolicyDefinition createRedeliveryPolicy(String maximumRedeliveries,
String redeliveryDelay) {
RedeliveryPolicyDefinition redeliveryPolicy = new RedeliveryPolicyDefinition();
redeliveryPolicy.setMaximumRedeliveries(maximumRedeliveries);
redeliveryPolicy.setRedeliveryDelay(redeliveryDelay);
redeliveryPolicy.retryAttemptedLogLevel(LoggingLevel.WARN);
redeliveryPolicy.logRetryStackTrace(false);
return redeliveryPolicy;
}
Where the parameter maximumRedeliveries is set to -1.
An example would be :
route
.onException(Exception.class)
.handled(true)
.process(this::logException)
.onRedelivery(this::logAttemptRedelivery)
.setRedeliveryPolicyType(
createRedeliveryPolicy(maxRedeliveryRetries, redeliveryDelay)
);
Upvotes: 1