Reputation: 847
I am learning Spring Integration JMS. I ran into a problem where my topic does not persist pending messages which are not yet consumed by the client.
Basically I start ActiveMQ then using REST Client I am invoking producer to send message for 50 times so that 50 messages gets enqueued in topic. At consumer end I have applied sleep timer of 5 seconds so that each message gets consumed at regular interval of 5s. Then in between I stopped ActiveMQ. Meanwhile some messages are consumed by client lets say 15 out of 50 have been consumed. Then If I restart ActiveMQ I was expecting topic to persist pending 35 messages but I can not see that in admin console under topics tab.
Here is my configuration file:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:oxm="http://www.springframework.org/schema/oxm"
xmlns:int-jme="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm-3.0.xsd">
<!-- Component scan to find all Spring components -->
<context:component-scan base-package="com.geekcap.springintegrationexample" />
<bean class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter">
<property name="order" value="1" />
<property name="messageConverters">
<list>
<!-- Default converters -->
<bean class="org.springframework.http.converter.StringHttpMessageConverter"/>
<bean class="org.springframework.http.converter.FormHttpMessageConverter"/>
<bean class="org.springframework.http.converter.ByteArrayHttpMessageConverter" />
<bean class="org.springframework.http.converter.xml.SourceHttpMessageConverter"/>
<bean class="org.springframework.http.converter.BufferedImageHttpMessageConverter"/>
<bean class="org.springframework.http.converter.json.MappingJackson2HttpMessageConverter" />
</list>
</property>
</bean>
<!-- Define a channel to communicate out to a JMS Destination -->
<int:channel id="topicChannel"/>
<!-- Define the ActiveMQ connection factory -->
<bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<!--
Define an adaptor that route topicChannel messages to the myTopic topic; the outbound-channel-adapter
automagically fines the configured connectionFactory bean (by naming convention
-->
<int-jms:outbound-channel-adapter channel="topicChannel"
destination-name="topic.myTopic"
pub-sub-domain="true" />
<!-- Create a channel for a listener that will consume messages-->
<int:channel id="listenerChannel" />
<int-jms:message-driven-channel-adapter id="messageDrivenAdapter"
channel="getPayloadChannel"
destination-name="topic.myTopic"
pub-sub-domain="true" />
<int:service-activator input-channel="listenerChannel" ref="messageListenerImpl" method="processMessage" />
<int:channel id="getPayloadChannel" />
<int:service-activator input-channel="getPayloadChannel" output-channel="listenerChannel" ref="retrievePayloadServiceImpl" method="getPayload" />
</beans>
I also read that default mode is persistent. But in my case it does not seems to be worked.
EDIT:
As per the answer given by Gary Russel after adding attributes
in <int-jms:message-driven-channel-adapter>
I am facing XML related issues
cvc-complex-type.3.2.2: Attribute 'subscription-durable' is not allowed to appear in element 'int-jms:message-driven-channel- adapter'.
cvc-complex-type.3.2.2: Attribute 'durable-subscription-name' is not allowed to appear in element 'int-jms:message-driven-channel- adapter'.
Please Help
Upvotes: 1
Views: 932
Reputation: 813
Topics in Activemq are not durable and persistent, so in case one of your consumer is down. You would lost your messages.
To make topic durable and persistent you can create a durable consumer by creating unique client id per consumer.
But again, that is not distributed in case you are following microservices architecture. So multiple pods or replicas will create problem while consuming messages as in no load balancing is possible for durable consumers.
To mitigate this scenario, there is a option of Virtual topics in Activemq.More details have been provided below,
You can send your messages via your producer in topic named as VirtualTopic.MyTopic. ** Note: you must have to follow this naming convention for default activemq configuration. But yes there is also a way to override this naming convention.
Now, to consume your messages via multiple consumers(A and B here), you have to set naming convention for your consumer side destination as well for eg. Consumer.A.VirtualTopic.MyTopic Consumer.B.VirtualTopic.MyTopic These two consumer will receive messages through the topic created above, also with load balancing enabled between multiple replicas of same consumer.
I hope this will help you fixing your problem with activemq topic.
Upvotes: 0
Reputation: 174494
That is how topics work, by default, read the JMS specification.
Topics are publish/subscribe; only subscribers that are present get to receive the message.
If you publish 5, start the consumer, publish another 5; he will only get the second 5.
If you kill the broker before he gets all 5; during the restart, the broker sees there are no consumers so he purges the messages.
You can change this behavior by using durable subscriptions, in which case the broker will indeed retain messages for each such subscription, even if not currently connected.
To configure this with Spring Integration, set subscription-durable
on the message-driven channel adapter and give it a unique subscription-name
.
Upvotes: 1