mahendra kawde
mahendra kawde

Reputation: 847

Persist message in ActiveMQ across server restart

I am learning Spring Integration JMS. Since ActiveMQ is a message broker. I referring to project given here-> http://www.javaworld.com/article/2142107/spring-framework/open-source-java-projects-spring-integration.html?page=2#

But I want to know how can I persist message in ActiveMQ. I mean I started ActiveMQ then send request using REST client. I am calling publishService.send( message ); in for loop for 50 times and and the receiver end I have sleep timer of 10 seconds. So that 50 messages gets queued and its start processing at 10 seconds interval.

EDIT:

Look at the screen shot below:

It says 50 messages enqueued and 5 of them have been dequeued.

enter image description here

But then in between I stopped ActiveMQ server and by the time it has consumed 5 messages out of 50 and then again restart it.

But then I was expecting it to show remaining 45 in Messages Enqueued column. But I can see 0(see screenshot below) there and they all vanished after server restart without having to persist remaining 45 messages. How can I come over this problem ?

enter image description here

Please have a look at the configuration below:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
       xmlns:oxm="http://www.springframework.org/schema/oxm"
       xmlns:int-jme="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
                http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
                http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
                http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm-3.0.xsd">


    <!-- Component scan to find all Spring components -->
    <context:component-scan base-package="com.geekcap.springintegrationexample" />

    <bean class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter">
        <property name="order" value="1" />
        <property name="messageConverters">
            <list>
                <!-- Default converters -->
                <bean class="org.springframework.http.converter.StringHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.FormHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.ByteArrayHttpMessageConverter" />
                <bean class="org.springframework.http.converter.xml.SourceHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.BufferedImageHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.json.MappingJackson2HttpMessageConverter" />
            </list>
        </property>
    </bean>

    <!-- Define a channel to communicate out to a JMS Destination -->
    <int:channel id="topicChannel"/>

    <!-- Define the ActiveMQ connection factory -->
    <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>

    <!--
        Define an adaptor that route topicChannel messages to the myTopic topic; the outbound-channel-adapter
        automagically fines the configured connectionFactory bean (by naming convention
      -->
    <int-jms:outbound-channel-adapter channel="topicChannel"
                                      destination-name="topic.myTopic"
                                      pub-sub-domain="true" />

    <!-- Create a channel for a listener that will consume messages-->
    <int:channel id="listenerChannel" />

    <int-jms:message-driven-channel-adapter id="messageDrivenAdapter"
                                            channel="getPayloadChannel"
                                            destination-name="topic.myTopic"
                                            pub-sub-domain="true" />

    <int:service-activator input-channel="listenerChannel" ref="messageListenerImpl" method="processMessage" />

    <int:channel id="getPayloadChannel" />

    <int:service-activator input-channel="getPayloadChannel" output-channel="listenerChannel" ref="retrievePayloadServiceImpl" method="getPayload" />

</beans>

Also please see the code:

Controller from where I am sending message in for loop at once:

@Controller
public class MessageController
{
    @Autowired
    private PublishService publishService;

    @RequestMapping( value = "/message", method = RequestMethod.POST )
    @ResponseBody
    public void postMessage( @RequestBody com.geekcap.springintegrationexample.model.Message message, HttpServletResponse response )
    {
        for(int i = 0; i < 50; i++){
            // Publish the message
            publishService.send( message );

            // Set the status to 201 because we created a new message
            response.setStatus( HttpStatus.CREATED.value() );
        }
    }

}

Consumer code to which I have applied timer:

@Service
public class MessageListenerImpl
{
    private static final Logger logger = Logger.getLogger( MessageListenerImpl.class );

    public void processMessage( String message )
    {
        try {
            Thread.sleep(10000);
            logger.info( "Received message: " + message );
            System.out.println( "MessageListener::::::Received message: " + message );
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
}

Further by searching more I found here that As per the JMS specification, the default delivery mode is persistent. But in my case it does not seems to be worked.

Please help me to have right configuration in place so that messages can be persist across broker failure.

Upvotes: 2

Views: 3144

Answers (1)

Vihar
Vihar

Reputation: 3831

This is typically not a problem, but the way activeMQ was built.

you can find the following explanation in book 'ActiveMQ in action'

  • Once a message has been consumed and acknowledged by a message consumer, it’s typically deleted from the broker’s message store.

So when you restart your server, it only shows you the messages which are in broker's message store. In most cases you will never have the need to have a look at the processed messages.

Hope this helps!

Good luck!

Upvotes: 1

Related Questions