Reputation: 149
I am using Spring JMS 4.1.
Since a previous post ([Use of CachingConnectionFactory
with DefaultJmsListenerContainerFactory
) I have been using a CachingConnectionFactory
(setting cacheConsumers
to false
) with the DefaultJmsListenerContainerFactory
:
<bean id="cachedConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory"
p:targetConnectionFactory-ref="jmsConnectionFactory"
p:cacheConsumers="false"
p:sessionCacheSize="3" />
<bean id="jmsListenerContainerFactory"
class="org.springframework.jms.config.DefaultJmsListenerContainerFactory"
p:connectionFactory-ref="cachedConnectionFactory"
p:destinationResolver-ref="jndiDestinationResolver"
p:concurrency="3-5"
p:backOff-ref="exponentialBackOff"
p:receiveTimeout="1000" />
I am now trying to use my listener class I did define programmatically:
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
...
MyListener receiver = new MyListener();
...
MessageListenerAdapter adapter = new MessageListenerAdapter(listener);
adapter.setDefaultListenerMethod("onMessage");
...
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId(endpointId);
endpoint.setDestination(...);
endpoint.setMessageListener(adapter);
registrar.registerEndpoint(endpoint);
}
It is working since message are well received by my listener class but logs look weird. After the start, it looks like only one thread is processing messages even if there are several waiting for in the queue:
2014-12-29 15:10:53,978: INFO [DefaultMessageListenerContainer-1] [MyListener.instantiateProcess():164] - Processing message
2014-12-29 15:10:53,978: INFO [DefaultMessageListenerContainer-3] [MyListener.instantiateProcess():164] - Processing message
2014-12-29 15:10:53,978: INFO [DefaultMessageListenerContainer-4] [MyListener.instantiateProcess():164] - Processing message
2014-12-29 15:10:53,995: INFO [DefaultMessageListenerContainer-2] [MyListener.instantiateProcess():164] - Processing message
2014-12-29 15:11:05,318: INFO [DefaultMessageListenerContainer-4] [MyListener.instantiateProcess():164] - Processing message
2014-12-29 15:11:10,052: INFO [DefaultMessageListenerContainer-4] [MyListener.instantiateProcess():164] - Processing message
2014-12-29 15:11:14,506: INFO [DefaultMessageListenerContainer-4] [MyListener.instantiateProcess():164] - Processing message
2014-12-29 15:11:18,825: INFO [DefaultMessageListenerContainer-4] [MyListener.instantiateProcess():164] - Processing message
2014-12-29 15:11:21,715: INFO [DefaultMessageListenerContainer-4] [MyListener.instantiateProcess():164] - Processing message
2014-12-29 15:11:21,725: INFO [DefaultMessageListenerContainer-5] [MyListener.instantiateProcess():164] - Processing message
2014-12-29 15:11:25,152: INFO [DefaultMessageListenerContainer-5] [MyListener.instantiateProcess():164] - Processing message
...
DefaultMessageListenerContainer-5 only until all 60 remaining messages have been read...
As you can see eventually only the container #5 is used to process all remaining messages. Messages are not processed in parallel and the defined concurrency does not seem to be used. I don't know if it was the exact same issue but I had a look to this post but I am not using ActiveMQ and I do not have this prefetch option.
Could you please explain why I have this behavior. Is it a Spring misconfiguration or is the issue somewhere else?
Thank you
EDIT:eventually I found an option on the messages broker to not keep messages order and things are getting better. There is a better threads use... Nevertheless I can see that :
DefaultMessageListenerContainer
threads are used (max value in concurrency);Why the maximum number of threads (5) or at least the core number (ie. 3) are not used almost until the queue has been completely emptied?
Upvotes: 0
Views: 2307
Reputation: 1674
Although this question was asked long time ago. I too faced this issue recently. Posting the answer for other's benefit.
One needs to set the prefetch
limit to 1.
You can provide this in the connection string.
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
Reference http://activemq.apache.org/what-is-the-prefetch-limit-for.html
Upvotes: 1