javatar
javatar

Reputation: 4621

Activemq how to configure a consumer listener (in Java)

I would be happy on the condition that anyone share a simple sample for xml based configuration of consumer listener in Spring. Thanks in advance.

EDIT;

I just would like to hear about the listener of consumer, not the consumer implementation because I have already implemented an active-mq in my app. and it is running well, however I cannot be sure the order of consuming items which are send by producer synchronously. The problem is the inconsistency and data manipulation due to async executions of a method (persisting some objects to db in order to log them) from concurrent consumers at a time.

EDIT2: Let me clarify this complexity. I have an application that consist of two base separate parts. The first one is the synchronously executing Producer that asks db for the products newly come, and then "one by one" send them thorough the "jmsTemplate.send" method provided by active-mq. This was the operation which is synchronously executed from a Cron/Timer. In other words, producer is being executed from a timer/cron. Now The problem is consumer itself. When producer sends products "one by one", async consumers (with concurrency enabled) receive the products and consume them asynchronously.

The problem begins here. Because the method, which is executed from the consumer when the products are just received, does some db persistence operations. When the same product is being received by separate concurrent consumers (it happens because of our system, not a jms issue, dont pay attention on this point) then doing the same persistence operations on the same entity occurs some exceptions as easy to predict. How can I prevent this async operations of products or manage the orders of comsuming products in that kind of application.

Thanks.

Upvotes: 4

Views: 13113

Answers (1)

Eugene
Eugene

Reputation: 120848

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:jms="http://www.springframework.org/schema/jms"
   xmlns:p="http://www.springframework.org/schema/p"
   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
                       http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

<!-- A simple and usual connection to activeMQ -->  
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616"></property>
</bean>                

 <!-- A POJO that implements the JMS message listener -->
<bean id="simpleMessageListener" class="MyJMSMessageListener" />


<!-- Cached Connection Factory to wrap the ActiveMQ connetion -->
<bean id="cachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">     
    <property name="targetConnectionFactory" ref="activeMQConnectionFactory"></property>
    <property name="sessionCacheSize" value="10"></property>
    <property name="reconnectOnException" value="true"></property>
</bean>


<!-- The Spring message listener container configuration -->
<jms:listener-container container-type="default" connection-factory="cachedConnectionFactory" acknowledge="auto">
    <jms:listener destination="FOO.TEST" ref="simpleMessageListener" method="onMessage" />
</jms:listener-container>

</beans>

And the Java class that listens to the message itslef:

import javax.jms.Message;
import javax.jms.MessageListener;
public class MyJMSMessageListener implements MessageListener{

    @Override
    public void onMessage(Message message) {
    // Do your work here
    }

}

Starting this listener it's a matter of getting the Application Context, it will auto-start the JMS Listener once you do that.

EDIT according to the other question :

So your system may generate (for example) 2 or even more messages delivered to the Consumers with the same productID? Well first this is rather not YOUR problem, but the application's. Even if you somehow you fix it, it is not really a fix, but it is a way to hide the problem itself. Nevertheless, if forced to provide a solution, right now, I can think of only one, the easiest, disable the concurrent consuming, sort of. Here is what I would do: Receive the messages in the Queue and have only one Consumer on that queue. Inside this Consumer I would process as little from the message as I can - take ONLY the productID and place it in some other queue. Before this you would have to always check if the productID is not already in that queue. If it is just return silently, if it is not, that means it has never been processed, thus place this message in a different Queue : Queue2 for example, and then enable concurrent consumers on the second Queue Queue2. This still has flaws though: first the productID queue should somehow be cleaned once in a while, otherwise it will grow forever, but that is that that hard. The tricky part: what if you have a productID in the productID queue, but the product came for an UPDATE in the DB and not INSERT? You should not reject it then...

Upvotes: 7

Related Questions