user4339860
user4339860

Reputation: 31

spring integration task executor queue filled with more records

I started to build a Spring Integration app, in which the input gateway generates a fixed number (50) of records and then stops generating new records. There are basic filters/routers/transformers in the middle, and the ending service activator and task executor config are as following:

<int:service-activator input-channel="inChannel" output-channel="outChannel" ref="svcProcessor">
        <int:poller fixed-rate="100" task-executor="myTaskExecutor"/>
</int:service-activator>

<task:executor id = "myTaskExecutor" pool-size="5" queue-capacity="100"/>

I tried to put some debug info at the begging of the svcProcessor method:

@Qualifier(value="myTaskExecutor")
@Autowired 
ThreadPoolTaskExecutor executor;

@ServiceActivator
public Order processOrder(Order order) {
  log.debug("---- " + "executor size: " + executor.getActiveCount() + 
        " q: " + executor.getThreadPoolExecutor().getQueue().size() + 
        " r: " + executor.getThreadPoolExecutor().getQueue().remainingCapacity()+
        " done: " + executor.getThreadPoolExecutor().getCompletedTaskCount() +
        " task: " + executor.getThreadPoolExecutor().getTaskCount()
    );
   //
   //process order takes up to 5 seconds.
   //
   return order;
}

After sometimes the program runs, the log shows the queue has reached over 50, then eventually gets reject exception:

23:38:31.096 DEBUG [myTaskExecutor-2] ---- executor size: 5 q: 44 r: 56 done: 11 task: 60
23:38:31.870 DEBUG [myTaskExecutor-5] ---- executor size: 5 q: 51 r: 49 done: 11 task: 67
23:38:33.600 DEBUG [myTaskExecutor-4] ---- executor size: 5 q: 69 r: 31 done: 11 task: 85
23:32:46.792 DEBUG [myTaskExecutor-1] ---- executor size: 5 q: 72 r: 28 done: 11 task: 88

It looks like the active count and sum of queue size/remaining looks right with the config of 5 and 100, but I am not clear why there are more than 50 records in the queue, and the taskCount is also larger than the limit 50.

Am I looking at the wrong info from the executor and the queue?

Thanks

UPDATE: (not sure if I should open another question)

I tried the xml version of the cafeDemo from spring-integration (branch SI3.0.x), and used pool provided in the document, but used 100 milliseconds rate and added capacity:

<int:service-activator input-channel="hotDrinks" ref="barista" method="prepareHotDrink" output-channel="preparedDrinks">
    <int:poller task-executor="pool" fixed-rate="100"/>          
</int:service-activator>
<task:executor id="pool" pool-size="5" queue-capacity="200"/>

After I ran it, it also got rejection exception after around the 20th delivery:

 org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@6c31732b[Running, pool size = 5, active threads = 5, queued tasks = 200, completed tasks = 0]]

There are only about 32 orders placed until the exception, so I am not sure why queued tasks = 200 and completed task = 0?

THANKS

Upvotes: 3

Views: 2057

Answers (1)

Panther
Panther

Reputation: 3339

getTaskCount() This method gives the number of total task assigned to executor since the start. So, it will increase with time.

And other variables are approximate number not exact as per documentation of java.

  1. getCompletedTaskCount() Returns the approximate total number of tasks that have completed execution.
  2. public int getActiveCount() Returns the approximate number of threads that are actively executing tasks.

Ideally getTaskCount() and getCompletedTaskCount() will increase linearly with time, as it includes all the previous tasks assigned since start of execution of your code. However, activeCount should be less than 50, but being approximate number it will go beyond 50 sometimes with little margin.

Refer :- https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html

Upvotes: 2

Related Questions