Reputation: 950
I want to implement a solution in Spring-JMS with activeMQ where I want to create a durable subscription to a topic. The purpose is that if a subscriber closes the subscription for a while and once again recreates the durablesubscription with same client id and subscription name, the subscriber should receive all the messages which were delivered during the time subscription was closed.
I want to implement the following logic mentioned in the ORACLE URL for durable subscriptions: https://docs.oracle.com/cd/E19798-01/821-1841/bncgd/index.html
But I am unable to perform this using spring-jms. As per the URL I need to get messageConsumer instance and call close() on that method to stop receiving message temporarily from the topic. But I am not sure how to get it.
Following is my configuration. Kindly let me know how to modify the configuration to perform this.
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
p:userName="admin"
p:password="admin"
p:brokerURL="tcp://127.0.0.1:61616"
primary="true"
></bean>
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer" p:durableSubscriptionName="gxaa-durable1" p:clientId="gxaa-client1">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="adiTopic"/>
<property name="messageListener" ref="adiListener"/>
</bean>
<bean id="configTemplate" class="org.springframework.jms.core.JmsTemplate"
p:connectionFactory-ref="connectionFactory"
p:defaultDestination-ref="adiTopic" primary="true"
p:pubSubDomain="true">
</bean>
<bean id="adiTopic" class="org.apache.activemq.command.ActiveMQTopic" p:physicalName="gcaa.adi.topic"></bean>
<bean id="adiListener" class="com.gcaa.asset.manager.impl.AdiListener"></bean>
Upvotes: 0
Views: 1299
Reputation: 3913
why not calling DefaultMessageListenerContainer.stop();
to stop the container and consumers ?
you can inject jmsContainer
to another bean and close it when you want and call start() later.
all messages sent to the broker when your durable consumer is offline will be stored until it reconnect.
to make subscription durables you need to add this to jmsContainer
bean
<property name="subscriptionDurable" value="true" />
<property name="cacheLevel" value="1" />
you can add a subscriptionName
or the class name of the specified message listener will be used.
You can add a clientID
to the connectionFactory
<property name="clientID" value="${jms.clientId}" />
or use
<bean class="org.springframework.jms.connection.SingleConnectionFactory" id="singleConnectionFactory"> <constructor-arg ref="connectionFactory" /> <property name="reconnectOnException" value="true" /> <property name="clientId" value="${jms.clientId}" /> </bean>
and update jmsContainer
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer" p:durableSubscriptionName="gxaa-durable1" p:clientId="gxaa-client1"> <property name="connectionFactory" ref="singleConnectionFactory" /> <property name="destination" ref="adiTopic" /> <property name="messageListener" ref="adiListener" /> <property name="subscriptionDurable" value="true" /> <property name="cacheLevel" value="1" /> </bean>
UPDATE :
if your adiListener
implements org.springframework.jms.listener.SessionAwareMessageListener
it have to define method onMessage(M message, Session session)
and when you have the session you can call javax.jms.Session.unsubscribe(String subscriptionName)
subscriptionName is defined above and can be injected to this bean or the class name of the specified message listener can be used.
Upvotes: 3