Reputation: 31
I started to build a Spring Integration app, in which the input gateway generates a fixed number (50) of records and then stops generating new records. There are basic filters/routers/transformers in the middle, and the ending service activator and task executor config are as following:
<int:service-activator input-channel="inChannel" output-channel="outChannel" ref="svcProcessor">
<int:poller fixed-rate="100" task-executor="myTaskExecutor"/>
</int:service-activator>
<task:executor id = "myTaskExecutor" pool-size="5" queue-capacity="100"/>
I tried to put some debug info at the begging of the svcProcessor method:
@Qualifier(value="myTaskExecutor")
@Autowired
ThreadPoolTaskExecutor executor;
@ServiceActivator
public Order processOrder(Order order) {
log.debug("---- " + "executor size: " + executor.getActiveCount() +
" q: " + executor.getThreadPoolExecutor().getQueue().size() +
" r: " + executor.getThreadPoolExecutor().getQueue().remainingCapacity()+
" done: " + executor.getThreadPoolExecutor().getCompletedTaskCount() +
" task: " + executor.getThreadPoolExecutor().getTaskCount()
);
//
//process order takes up to 5 seconds.
//
return order;
}
After sometimes the program runs, the log shows the queue has reached over 50, then eventually gets reject exception:
23:38:31.096 DEBUG [myTaskExecutor-2] ---- executor size: 5 q: 44 r: 56 done: 11 task: 60
23:38:31.870 DEBUG [myTaskExecutor-5] ---- executor size: 5 q: 51 r: 49 done: 11 task: 67
23:38:33.600 DEBUG [myTaskExecutor-4] ---- executor size: 5 q: 69 r: 31 done: 11 task: 85
23:32:46.792 DEBUG [myTaskExecutor-1] ---- executor size: 5 q: 72 r: 28 done: 11 task: 88
It looks like the active count and sum of queue size/remaining looks right with the config of 5 and 100, but I am not clear why there are more than 50 records in the queue, and the taskCount is also larger than the limit 50.
Am I looking at the wrong info from the executor and the queue?
Thanks
UPDATE: (not sure if I should open another question)
I tried the xml version of the cafeDemo from spring-integration (branch SI3.0.x), and used pool provided in the document, but used 100 milliseconds rate and added capacity:
<int:service-activator input-channel="hotDrinks" ref="barista" method="prepareHotDrink" output-channel="preparedDrinks">
<int:poller task-executor="pool" fixed-rate="100"/>
</int:service-activator>
<task:executor id="pool" pool-size="5" queue-capacity="200"/>
After I ran it, it also got rejection exception after around the 20th delivery:
org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@6c31732b[Running, pool size = 5, active threads = 5, queued tasks = 200, completed tasks = 0]]
There are only about 32 orders placed until the exception, so I am not sure why queued tasks = 200 and completed task = 0?
THANKS
Upvotes: 3
Views: 2057
Reputation: 3339
getTaskCount()
This method gives the number of total task assigned to executor since the start. So, it will increase with time.
And other variables are approximate number not exact as per documentation of java.
getCompletedTaskCount()
Returns the approximate total number of tasks that have completed execution.public int getActiveCount()
Returns the approximate number of threads that are actively executing tasks.Ideally getTaskCount()
and getCompletedTaskCount()
will increase linearly with time, as it includes all the previous tasks assigned since start of execution of your code. However, activeCount should be less than 50, but being approximate number it will go beyond 50 sometimes with little margin.
Refer :- https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
Upvotes: 2