Reputation: 467
I was trying to play a bit with the example as here https://github.com/spring-projects/spring-integration-samples/commit/3c855f82047c2e5f639bbec47ad44b4782b366da, so instead of line:
<int:splitter input-channel="processChannel" output-channel="process" order="1" />
I added below:
<int:splitter input-channel="processChannel" output-channel="someCheckGate" order="1"/>
<int:channel id="someCheckGate"/>
<int:router input-channel="someCheckGate" apply-sequence="false" default-output-channel="aggregatorChannel" expression="1 eq 1">
<int:mapping value="true" channel="aggregatorChannel"/>
<int:mapping value="false" channel="aggregatorChannel"/>
</int:router>
(true/false points to the same channel to describe my question). What I was trying to do is to bypass the process channel based on some condition (either based on headers or payload..., at the moment always true) and it bypasses fine, aggregates fine but it hangs for the defined timeout after BarrierMessageHandler.trigger is called:
if (!syncQueue.offer(message, timeout, TimeUnit.MILLISECONDS))
and returns back in the next line:
this.logger.error("Suspending thread timed out or did not arrive within timeout for: " + message);
syncQueue size is 0.
After the thread is back it tries to release but then it hangs again on:
Message<?> releaseMessage = syncQueue.poll(this.timeout, TimeUnit.MILLISECONDS);
Getting null and finally throwing the ReplyRequiredException on barrier timeout. Seems like quite nice logic here...:) Can you please advice what am I missing here? Why this router doesn't feet here?
Upvotes: 2
Views: 470
Reputation: 174664
The problem is, that in this scenario, you are attempting to release the barrier on the same thread as the one you want to suspend.
In this case (since process
is skipped for all splits), the release occurs before the suspend.
The way the barrier works is if the release occurs first, that thread waits until the suspending thread arrives. Since it's the same thread, it will never happen.
Notice that process
is a queue channel - those messages are handed off to another thread. Hence you need another queue channel for "skipped" splits.
<int:router input-channel="someCheckGate" apply-sequence="false" default-output-channel="toAgg" expression="1 eq 1">
<int:mapping value="true" channel="toAgg"/>
<int:mapping value="false" channel="toAgg"/>
</int:router>
<int:channel id="toAgg">
<int:queue />
</int:channel>
<int:bridge input-channel="toAgg" output-channel="aggregatorChannel">
<int:poller fixed-delay="1000" />
</int:bridge>
Also, that example is written to expect exceptions; since they are all successful, the framework wants to send a reply.
See this commit for a full working version with the router in place.
Another solution would be to make aggregatorChannel
an ExecutorChannel
:
<int:channel id="aggregatorChannel">
<int:dispatcher task-executor="exec" />
</int:channel>
Upvotes: 1