Shoaib Khan
Shoaib Khan

Reputation: 939

Camel Idempotent Consumer incorrect behaviour for removeOnFailure=true

I would like to know if the below is expected behaviour for Camel idempotent consumer:

I have removeOnFailure=true for the route, which means basically when the exchange fails idempotent consumer should remove the Identifier from the repository. This brings up a very interesting scenario which allows duplicate on the exchange.

Suppose I have identifier=12345 and first attempt to execute the exchange was Succesfull which means identifier is added to idempotent repository. Next attempt to use same identifier i.e 12345 fails as this is caught as Duplicate Message (CamelDuplicateMessage). But at this point having removeOnFailure=true will remove the identifier from the repository which on next attempt will allow the exchange to go through successfully without catching the default message. Hence, creating a room for duplication on the exchange.

Can someone advise if this is expected behaviour or some bug?

Sample Route:

from("direct:Route-DeDupeCheck").routeId("Route-DeDupeCheck")
            .log(LoggingLevel.DEBUG, "~~~~~~~ Reached to Route-DeDupeCheck: ${property.xref}")
            .idempotentConsumer(simple("${property.xref}"), MemoryIdempotentRepository.memoryIdempotentRepository()) //TODO: To replace with Redis DB for caching
            .removeOnFailure(true)
            .skipDuplicate(false)
            .filter(exchangeProperty(Exchange.DUPLICATE_MESSAGE).isEqualTo(true))
                .log("~~~~~~~ Duplicate Message Found!")
                .to("amq:queue:{{jms.duplicateQueue}}?exchangePattern=InOnly") //TODO: To send this to Duplicate JMS Queue
                .throwException(new AZBizException("409", "Duplicate Message!"));

Upvotes: 2

Views: 1678

Answers (1)

pvpkiran
pvpkiran

Reputation: 27078

Your basic premise is wrong.

Next attempt to use same identifier i.e 12345 fails as this is caught as Duplicate Message (CamelDuplicateMessage)

When there is a duplicated message, it is not considered as a failure. It is just ignored from further processing(unless you have skipDuplicate option set to true).

Hence the scenario what you just explained cannot occur what so ever.

It is very easy to test. Considering you have a route like this,

public void configure() throws Exception {
        //getContext().setTracing(true); Use this to enable tracing

        from("direct:abc")
            .idempotentConsumer(header("myid"),
                MemoryIdempotentRepository.memoryIdempotentRepository(200))
            .removeOnFailure(true)
            .log("Recieved id : ${header.myid}");
    }
}

And a Producer like this

@EndpointInject(uri = "direct:abc")
ProducerTemplate producerTemplate;

for(int i=0, i<5,i++) {
   producerTemplate.sendBodyAndHeader("somebody","myid", "1");
}

What you see in logs is

INFO 18768 --- [tp1402599109-31] route1   : Recieved id : 1

And just once.

Upvotes: 2

Related Questions