Reputation: 63
I have a case where i want to run DefaultMessageListenerContainer in the same 'main' thread. Right now it uses SimpleAsyncTaskExecutor which spawns new thread every time it receives a message.
We have a test case which connects to different distributed system and do the processing and in the end it asserts few things. As DefaultMessageListenerContainer runs in seperate thread, main thread returns and start executing assertions before DefaultMessageListenerContainer can complete. This leads to failure of the test case. As work around we have made the main thread to sleep for few seconds.
Sample config
<int-jms:message-driven-channel-adapter
id="mq.txbus.publisher.channel.adapter"
container="defaultMessageListenerContainer"
channel="inbound.endpoint.publisher"
acknowledge="transacted"
extract-payload="true" />
<beans:bean id="defaultMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<beans:property name="connectionFactory" ref="mockConnectionFactory"/>
<beans:property name="destination" ref="publisherToTxmQueue"/>
<beans:property name="taskExecutor" ref="taskExecutor"/>
<beans:property name="maxMessagesPerTask" value="10"/>
<beans:property name="sessionTransacted" value="true"/>
</beans:bean>
<beans:bean id="taskExecutor" class="org.springframework.scheduling.timer.TimerTaskExecutor" />
I am trying to use TimerTaskExecutor here because it creates single thread but that thread is seperate than main thread so problem is unresolved. I tried using SyncTaskExecutor but that does not work either (or may be i dint provide the correct property value?).
Answer:
We solved this problem by using SimpleMessageListenerContainer.
This is the new config
<int-jms:message-driven-channel-adapter
id="mq.txbus.publisher.channel.adapter"
container="messageListenerContainer"
channel="inbound.endpoint.publisher"
acknowledge="transacted"
extract-payload="true" />
<beans:bean id="messageListenerContainer" class="org.springframework.jms.listener.SimpleMessageListenerContainer">
<beans:property name="connectionFactory" ref="mockConnectionFactory"/>
<beans:property name="destination" ref="publisherToTxmQueue"/>
<beans:property name="sessionTransacted" value="true"/>
<beans:property name="exposeListenerSession" value="false"/>
</beans:bean>
Upvotes: 2
Views: 7248
Reputation: 1317
if its for testing purpose then why not use a CyclicBarrier that runs the assertion once the jms activity is completed.
Upvotes: 0
Reputation: 340963
First you must understand that jms is inherently asynchronous and does not block. This means once you send a message to a queue it will be processed by another thread, maybe on a different machine, possibly minutes or hours later if the consumer is down.
Reading the description of your test case it looks like you are doing some system/integration testing. Unfortunately there is not much you can do except waiting, however you should not wait blindly as this makes your tests slow but also not very stable - no matter how long you wait, on a busy system or some lengthy GC process your test might still time out even though there is no error.
So instead of sleeping for a fixed number of seconds - sleep for e.g. ~100 milliseconds and check some condition that is only met when the processing of message was done. For example if processing the message insert some record into the database, check the database periodically.
Much more elegant way (without busy waiting) is to implement request/repply pattern, see How should I implement request response with JMS? for implementation details. Basically when sending a message you define a reply queue and block waiting for a message in that queue. When processing the original message is done, the consumer should send a reply message to a defined queue. When you receive that message - perform all assertions.
Upvotes: 1