Reputation: 5109
I have two Java processes, the first one of them produces messages and puts them onto an ActiveMQ queue. The second process (consumer) uses Spring Integration to get messages from the queue and processes them in threads.
I have two requirements:
The consumer should have 3 processing threads. If I have 10 messages coming in through the queue, I want to have 3 threads processing the first 3 messages, and the other 7 messages should be buffered.
When the consumer stops while some messages are not yet processed, it should continue processing the messages after a restart.
Here's my config:
<bean id="messageActiveMqQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="example.queue" />
</bean>
<int-jms:message-driven-channel-adapter
destination="messageActiveMqQueue" channel="incomingMessageChannel" />
<int:channel id="incomingMessageChannel">
<int:dispatcher task-executor="incomingMessageChannelExecutor" />
</int:channel>
<bean id="incomingMessageChannelExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="daemon" value="false" />
<property name="maxPoolSize" value="3" />
</bean>
<int:service-activator input-channel="incomingMessageChannel"
ref="myMessageProcessor" method="processMessage" />
The first requirement works as expected. I produce 10 messages and 3 myMessageProcessors start processing a message each. As soon as the 1st message has finished, the 4th message is processed.
However, when I kill the consumer before all messages are processed, those messages are lost. After a restart, the consumer does not get those messages again.
I think in the above configuration that's because the threads generated by the ThreadPoolTaskExecutor queue the messages. So the messages are already removed from the incomingMessageChannel. Hence I tried setting the queue capacity of the incomingMessageChannelExecutor:
<property name="queueCapacity" value="0" />
But now I get error messages when I have more than 3 messages:
2013-06-12 11:47:52,670 WARN [org.springframework.jms.listener.DefaultMessageListenerContainer] - Execution of JMS message listener failed, and no ErrorHandler has been set.
org.springframework.integration.MessageDeliveryException: failed to send Message to channel 'incomingMessageChannel'
I also tried changing the message-driven-channel-adapter
to an inbound-gateway
,
but this gives me the same error.
Do I have to set an error handler in the inbound-gateway
, so that the errors go back to the ActiveMQ queue? How do I have to configure the queue so that the messages are kept in the queue if the ThreadPoolTaskExecutor doesn't have a free thread?
Thanks in advance,
Benedikt
Upvotes: 1
Views: 894
Reputation: 174544
No; instead of using an executor channel, you should be controlling the concurrency with the <message-driven-channel-adapter/>
.
Remove the <dispatcher/>
from the channel and set concurrent-consumers="3"
on the adapter.
Upvotes: 2