dbf
dbf

Reputation: 6499

Batch fetching from JMS queue

I need to implement the following workflow:

  1. Every N milliseconds fetch all available messages from JMS queue, but not more than K items
  2. Do some processing
  3. Acknowledge all of them once processing is done

I'm interested in how to implement step #1 correctly using JMS.

What I've tried is to:

  1. Create a BlockingQueue size K
  2. In the onMessage method of my JMS MessageListener put the messages into the BlockingQueue.
  3. Every N milliseconds drain the BlockingQueue, process the messages, and acknowledge them.

It looks fine at glance, but the issue is that if some message is fetched in onMessage during processing of the batch then once the batch is done it will be also acked while it is in the BlockingQueue and was not processed yet (reference). So if my app goes down message will not be processed at all.

What is a better way to implement step #1? Is it something in JMS API or maybe some standard approach?

Upvotes: 1

Views: 1340

Answers (1)

Justin Bertram
Justin Bertram

Reputation: 35008

JMS doesn't support "batch fetching." Messages have to be fetched/received one at a time. However, work on multiple messages can be batched together using a transacted session.

Also, by using a MessageListener implementation you're giving up control of when your application receives messages as the onMessage method will be invoked whenever a message is put into the queue.

I recommend you eliminate the MessageListener completely. Instead, every N seconds use a transacted javax.jms.Session and invoke javax.jms.MessageConsumer.receive(long) until it returns null (which means the queue is empty) or you have K messages. Then you can process all those messages and invoke javax.jms.Session.commit() when you're done to commit the transacted session and acknowledge all the messages.

Keep in mind that the larger your batch is the greater the chance is that you'll duplicate work. For example, if you receive K messages and process K - 1 messages before some kind of failure occurs which prevents you from committing the transacted session then those messages will be cancelled back to the queue and potentially redelivered and processed again later.

Upvotes: 4

Related Questions