Sai Kumar
Sai Kumar

Reputation: 112

DefaultMessageListenerContainer consuming messages intermittently

I have the following configuration for consuming messages from a queue. I need to make sure that the task executor should perform only one task at a time and hence I have configured the task executor too as follows.

<bean name="jmsTaskExecutor"
    class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="1" />
    <property name="maxPoolSize" value="2" />
</bean>

When I configure my task Executor as above, 10 messages are getting consumed at a time (there is a huge flow of messages in the queue) and the container stops listening messages for almost 10-15 minutes.My container is configured as follows:

<bean id="queueContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="cachedConnectionFactory" />
    <property name="destination" ref="queue" />
    <property name="maxConcurrentConsumers" value="1" />
    <property name="idleTaskExecutionLimit" value="1" />
    <property name="idleConsumerLimit" value="5" />
    <property name="receiveTimeout" value="10000" />
    <property name="recoveryInterval" value="10000" />
    <property name="taskExecutor" ref="jmsTaskExecutor" />
    <property name="messageListener" ref="queueListener" />
    <property name="autoStartup" value="true" />
</bean>

After doing a little bit of googling, i tried to use SyncTaskExecutor instead of ThreadPoolTaskExecutor and I have configured my taskExecutor as follows:

<bean name="jmsTaskExecutor"
            class="org.springframework.core.task.SyncTaskExecutor" />

but that is causing a memory leak in tomcat.

Can you please let me know how I can achieve the behavior of consuming messages and handling the message to the task only after the task is completed?

Queue Listener Code is as follows:

public class QueueListener implements SessionAwareMessageListener<Message>{
 @override
 public void onMessage(Message msg,Session ses) throws JMSException{
 ....
 ....
 ....
 }
}

Upvotes: 1

Views: 2172

Answers (1)

2020
2020

Reputation: 2841

When I configure my task Executor as above, 10 messages are getting consumed at a time

That suggests that your prefetch size is 10. So, even if you have only one thread processing the messages, that thread takes in 10 messages at a time. If you start your broker with a prefetch size of 0 or 1 (depending on the broker implementation), you will get the "one message at a time" behavior that you desire.

For ex, you can look at this link on setting the prefetch size if you are using ActiveMQ.

If your message processing is taking time then we need to see more of your "onMessage" code to tell where you might be spending time.

Upvotes: 0

Related Questions