user2324686
user2324686

Reputation: 71

ignoring duplicate messages in jms queue

I have mule flow where I am using JMS polling to poll the queue every 5 minutes and process all the messages in queue. I receive message in queue whenever there is some update to the entity. Whenever I see the message, I pull unique id from message and use that to retrieve the updated entity through web service call. Entity could be updated multiple times so I need to capture all the updates. For example if I have 10 messages in the queue referring to same entity update then I need to ignore 9 messages and consume only one since all the messages are referring to same entity. I tried to use idempotent-message-filter for this. But it works fine for the first time. When application reads the messages second time from queue after 5 minutes(polling frequency) and there are messages(updates) for the same entity then idempotent-message-filter ignore those messages. Ideally application should consume the same messages in future as well during different polling intervals if there are any. It should ignore the messages only if found duplicate during one interval. But if found during the future intervals then it should repeat the same processing(ignore duplicates and process only one) that it performed during first interval.

I hope I am able to explain my issue.

Any help will be highly appreciate.

Thanks, Vijay

Upvotes: 1

Views: 2207

Answers (4)

salumiah
salumiah

Reputation: 11

<idempotent-message-filter idExpression="#[message:payload]" doc:name="Idempotent Message" throwOnUnaccepted="true" onUnaccepted="ValidationFailFlow">
<in-memory-store name="myInMemoryObjectStore" entryTTL="120" expirationInterval="3600" maxEntries="60000" ></inmemory-store>
</idempotent-message-filter>

Upvotes: 0

user2324686
user2324686

Reputation: 71

Thanks David/Daniel. I used it as follow :

idempotent-message-filter idExpression="XPATH expression to retrieve entity id" > in-memory-store entryTTL="60000"/> /idempotent-message-filter>

Its working fine for me.

Appreciate you guys help on this.

Vijay

Upvotes: 0

David Dossot
David Dossot

Reputation: 33413

The idempotent-message-filter is what you want to use but you need to configure it with a custom idExpression:

<idempotent-message-filter idExpression="#[ ... ]" />

The documentation for this expression says:

Defines one or more expressions to use when extracting the ID from the message. For example, it would be possible to combine to headers as the ID of the message to provide idempotency: '#[headers:foo,bar]'. Or, you could combine the message ID with a header: '#[message:id]-#[header:foo]'. If this property is not set, '#[message:id]' will be used by default.

The idea is then to craft an expression that has a component that is the same for one poll (probably a modulo of the system time) and same for messages that are deemed identical in one poll.

Upvotes: 1

Daniel
Daniel

Reputation: 958

Try setting entryTTL for the filter Object Store like:

<idempotent-message-filter idExpression="#[yourExpression]">
  <in-memory-store entryTTL="300000"/>
</idempotent-message-filter>

This way the entry used to filter messages expires after five minutes

HTH

Upvotes: 4

Related Questions