Reputation: 2431
I have a integration flow that seems like this
a). a poller dropping messages in a queued channel
b). a service activator picking up messages from the queue and I have specified an executor on the poller for this service activator
c). the service activator sends the processed messages to a publish subscriber channel
d). the publish subscribe channel then has multiple consumers which receive the message for further processing
okay so what I am wondering is what role does the executor plays in this flow
a) so my service activator that polls from the queue has an executor which lets say has a fixed pool of 10 threads and lets say I have max-messages-per-poll as 5 so im assuming 5 of new messages will be processed in one go by 5 separate threads from the pool.
b) assuming thats correct then what happens when the 5 different messages reach the publish-subscriber channel. Lets say this pub-sub channel has 3 subscribers .. does that means internally 3 new threads will be spawned to asynchronously pass the incoming message to the 3 different subscribers .. so that the subscribers/consumers can do their processing of the message in parallel. I guess at this point it gets a little fuzzy on how things are gonna be processed. basically I guess im wondering is if the consumers are provided the message and the consumers chains run in parallel .. and if thats the case where is the executor for that
any comments would be appreciated Thanks
++++++++++++++++++++++++++++++
Based on below input I was trying this out
<int:service-activator
input-channel="filesIn"
output-channel="readyFiles"
ref="handler">
<int:poller fixed-delay="3000" max-messages-per-poll="3" />
</int:service-activator>
<int:publish-subscribe-channel id="readyFiles" task-executor="executor">
</int:publish-subscribe-channel>
<int:service-activator
id="consumer1"
input-channel="readyFiles"
ref="handler"
/>
<int:service-activator
id="consumer2"
input-channel="readyFiles"
ref="handler"
/>
<int:service-activator
id="consumer3"
input-channel="readyFiles"
ref="handler"
/>
<task:executor id="executor" pool-size="10" rejection-policy="CALLER_RUNS"/>
my handler simply slows things down a bit
public void handle(File file) throws InterruptedException{
log.debug(Thread.currentThread().getName() + " executing for file " + file.getName() + " ...");
Thread.sleep(3000);
log.debug(Thread.currentThread().getName() + " completed for file " + file.getName());
}
because I am using a task executor on the pub-sub channel i was expecting that all 3 consumes should be started together but instead the result is sequential invocation
output
18:44:11.000 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 executing for file 1.txt ...
18:44:14.001 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 completed for file 1.txt
18:44:14.002 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 executing for file 2.txt ...
18:44:17.002 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 completed for file 2.txt
18:44:17.002 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 executing for file 3.txt ...
18:44:20.002 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 completed for file 3.txt
each subscriber is invoked after the previous one has completed.
Upvotes: 1
Views: 1927
Reputation: 121542
<publish-subscribe-channel>
has task-executor
option to initiate handler invocation within threads of that Executor
. That means that your subscribers might process the message in parallel. But, of course, there is no guaranty, as it depends on Executor
nature and state.
I think this snapshot from source code (BroadcastingDispatcher
) should help you:
for (final MessageHandler handler : handlers) {
if (this.executor != null) {
this.executor.execute(new Runnable() {
@Override
public void run() {
invokeHandler(handler, messageToSend);
}
});
dispatched++;
}
else {
if (this.invokeHandler(handler, messageToSend)) {
dispatched++;
}
}
}
Upvotes: 0