Reputation: 71
I have mule flow where I am using JMS polling to poll the queue every 5 minutes and process all the messages in queue. I receive message in queue whenever there is some update to the entity. Whenever I see the message, I pull unique id from message and use that to retrieve the updated entity through web service call. Entity could be updated multiple times so I need to capture all the updates. For example if I have 10 messages in the queue referring to same entity update then I need to ignore 9 messages and consume only one since all the messages are referring to same entity. I tried to use idempotent-message-filter for this. But it works fine for the first time. When application reads the messages second time from queue after 5 minutes(polling frequency) and there are messages(updates) for the same entity then idempotent-message-filter ignore those messages. Ideally application should consume the same messages in future as well during different polling intervals if there are any. It should ignore the messages only if found duplicate during one interval. But if found during the future intervals then it should repeat the same processing(ignore duplicates and process only one) that it performed during first interval.
I hope I am able to explain my issue.
Any help will be highly appreciate.
Thanks, Vijay
Upvotes: 1
Views: 2207
Reputation: 11
<idempotent-message-filter idExpression="#[message:payload]" doc:name="Idempotent Message" throwOnUnaccepted="true" onUnaccepted="ValidationFailFlow">
<in-memory-store name="myInMemoryObjectStore" entryTTL="120" expirationInterval="3600" maxEntries="60000" ></inmemory-store>
</idempotent-message-filter>
Upvotes: 0
Reputation: 71
Thanks David/Daniel. I used it as follow :
idempotent-message-filter idExpression="XPATH expression to retrieve entity id" > in-memory-store entryTTL="60000"/> /idempotent-message-filter>
Its working fine for me.
Appreciate you guys help on this.
Vijay
Upvotes: 0
Reputation: 33413
The idempotent-message-filter
is what you want to use but you need to configure it with a custom idExpression
:
<idempotent-message-filter idExpression="#[ ... ]" />
The documentation for this expression says:
Defines one or more expressions to use when extracting the ID from the message. For example, it would be possible to combine to headers as the ID of the message to provide idempotency: '#[headers:foo,bar]'. Or, you could combine the message ID with a header: '#[message:id]-#[header:foo]'. If this property is not set, '#[message:id]' will be used by default.
The idea is then to craft an expression that has a component that is the same for one poll (probably a modulo of the system time) and same for messages that are deemed identical in one poll.
Upvotes: 1
Reputation: 958
Try setting entryTTL
for the filter Object Store like:
<idempotent-message-filter idExpression="#[yourExpression]">
<in-memory-store entryTTL="300000"/>
</idempotent-message-filter>
This way the entry used to filter messages expires after five minutes
HTH
Upvotes: 4