Reputation: 93
I am receiving data from multiple sources in different threads. Planning to pass the data to a single channel and then a separate thread will process data from this channel. My context is as follows
<task:executor id="singleThreadedExecutor" pool-size="1" />
<int:channel id="entryChannel">
<int:dispatcher task-executor="singleThreadedExecutor"/>
</int:channel>
<int:header-enricher input-channel="entryChannel" output-channel="processDataChannel">
<int:error-channel ref="exceptionHandlerChannel" overwrite="true" />
<int:header name="systemtime" expression="T(java.lang.System).currentTimeMillis()" />
<int:header name="nanotime" expression="T(java.lang.System).nanoTime()" />
</int:header-enricher>
I wanted to process data as soon as it arrives. I have concerns when data arrives much faster then data processing time, in the separate thread. From the documentation, calling Send on entryChannel should return immediately. Does dispatcher has internal queuing mechanism to ensure data will be handed over to the channel? How can we ensure that data will be handed over as soon as it arrives? Interested to know about the best practice in cases where we need to process data in a separate thread, as soon as it arrives, in SI?
Upvotes: 1
Views: 535
Reputation: 121177
First of all your issue is here:
<task:executor id="singleThreadedExecutor" pool-size="1" />
Independently of senders only one Thread
is able to get messages from the entryChannel
. And it does that only when it is free and ready to do what you ask from it. But in your case it is busy to process the first message, then the second and so on. One by one and only one at a time, because it is a single thread.
You just need to increase pool size (e.g. 10) to allow to distribute messages from that channel for several parallel threads.
Regarding the second question: Back Pressure.
Upvotes: 1
Reputation: 174504
The task executor itself holds the tasks (that invoke the subscribed endpoint) in a queue so, yes, the caller exits immediately (as long as there's room in the queue, which is always true with your configuration).
Upvotes: 0