Alexandros
Alexandros

Reputation: 733

jms persistent messages and order of messages obtained

What i need: 1. Send messages through jms. 2. Have messages persisted so that if not handled from client and server restarts client gets the message on server restart. 3. Messages should be received from client in the same order as these messages have been sent. 4. Have multiple consumers in the queue so as to scale and handle different messages in parallel.

What i did: I use active mq and have the following spring configuration

<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg index="0" value="destination"/>
</bean>

<bean id="template" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="pooledConnectionFactory"/>
    <property name="deliveryMode" value="2"/>
    <property name="defaultDestination" ref="destination"/>
</bean>

<bean id="messageListener" class="InternalNotificationJMSConsumer"/>

<jms:listener-container connection-factory="pooledConnectionFactory" acknowledge="client" destination-type="queue" concurrency="5-10" container-type="simple"  cache="none" client-id="client1" prefetch="1">
      <jms:listener destination="destination" ref="messageListener" method="onMessage"/>
</jms:listener-container>

So i have my destination and my connection pool. Then i create a spring jmstemplate and start sending messages

template.send(new MessageCreator() {
  public Message createMessage(Session session) throws JMSException {
    return session.createObjectMessage(message);
  }
});

Then i have the receiver as follows:

@Component
public class JMSConsumer implements MessageListener {

  @Override
  public void onMessage(Message message) {
    if (message instanceof ObjectMessage) {
      ....
     }
   }

My problems:

a. Messages using this configuration are not received in order. This makes sence given the xsd comment in listener-container.concurency: "keep concurrency limited to 1 in case of a topic listener or if message ordering is important;". So question is how/if i can keep order but have multiple consumers.

b. message acknowledgement. What i need is that after message has been processed, onMessage returns, only then message is considered as acknowledged and not re-transmited. I have no transactionmanager in terms that i am not inside any application server and wish to avoid this if possible. i only need to acknowledge messages my self. Is this possible?

Upvotes: 1

Views: 4331

Answers (2)

Alexandros
Alexandros

Reputation: 733

Ok here is the answer to my problem.

<jms:listener-container connection-factory="pooledConnectionFactory" acknowledge="client" destination-type="queue" concurrency="5-10" container-type="simple"  cache="none" prefetch="1">
      <jms:listener destination="destination" ref="messageListener" method="onMessage"/>
</jms:listener-container>

Then on message listener implementation: BlockingExecutor executor = new BlockingExecutor(5); //number of concurent massages to be consumed

public void onMessage(Message message) {
....
executor.submitTask(new MessageDispatchCommand((ObjectMessage) message));
....

}

class MessageDispatchCommand implements Runnable {
  private ObjectMessage message;

  public MessageDispatchCommand (final ObjectMessage message) {
    this.message = message;
  }

  public void run() {
    try {
      Serializable msg = message.getObject();
      handle message here
     }....
}

BlockingExecutor code can be found from here: Java Concurrency in Practice: BoundedExecutor implementation

Simply put. i have only one jms message listener since this keeps message order correct. Then i create runnable commands to get messages and handle them (here is the time consuming job done but it is made inside different threads). Since i use a boundedexecutor when all message handling threads have been exausted, onMessage blocks and thus subsequent messages are simply persisted untill a thread of the bounded executor becomes free again.

Upvotes: 1

Jakub Korab
Jakub Korab

Reputation: 5024

If you are consuming messages in parallel, then due to the nature of threading there can never be a guarantee that they will be completed in exactly the order they were sent in - only that they will be handed off to consumers in order. If there are only certain messages that absolutely, positively must be ordered (usually related to a transaction/account), you can use Message Groups feature to associate groups with consumers - a kind of sticky load balancing.

Acknowledging messages by yourself is most definitely possible, but probably not needed. The MessageListenerContainer has a acknowledge attribute which if set to transacted will result in the message only being acknowledged as consumed once the MessageListener has completed executing, without an exception being thrown. No transaction manager necessary (though Spring's PlatformTransactionManager doesn't need any particular container if you did want to use one).

If you definitely want to acknowledge messages yourself, set acknowledge to client, and call message.acknowledge() when your message has completed successfully. If the listener completes without this being called, the message will be sent to a DLQ (default) or redelivered.

Upvotes: 1

Related Questions