prudvi raju
prudvi raju

Reputation: 505

Future Object with mentioned timeOut is keep increasing for the next threads(timeOut is not applying for all the threads in the ThreadPool in Java)

The Worker Thread is Defined here with heavy task of 10 Seconds in run method

import java.util.Date;
import java.util.Random;
import java.util.concurrent.Callable;

public class WorkerThread implements Callable {

private String command;
private long startTime;
public WorkerThread(String s){
    this.command=s;
}

@Override
public Object call() throws Exception {
    startTime = System.currentTimeMillis();
    System.out.println(new Date()+"::::"+Thread.currentThread().getName()+" Start. Command = "+command);
    Random generator = new Random(); 
    Integer randomNumber = generator.nextInt(5); 
    processCommand();
    System.out.println(new Date()+ ":::"+Thread.currentThread().getName()+" End.::"+command+"::"+ (System.currentTimeMillis()-startTime));
    return randomNumber+"::"+this.command;
}

private void processCommand() {
    try {
        Thread.sleep(10000);
    } 
    catch (Exception e) {

        System.out.println("Interrupted::;Process Command:::"+this.command);
    }
}

@Override
public String toString(){
    return this.command;
}

}

Defined my WorkerPool with the Future get Timeout of 1 second.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class WorkerPool {

        static BlockingQueue queue=new LinkedBlockingQueue(2);
        static RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
        static ThreadFactory threadFactory = Executors.defaultThreadFactory();
        static ThreadPoolExecutor executorPool = new ThreadPoolExecutor(4, 4, 11, TimeUnit.SECONDS, queue, threadFactory, rejectionHandler);
        static MyMonitorThread monitor = new MyMonitorThread(executorPool, 3);
        public static void main(String args[]) throws InterruptedException, TimeoutException{
            List<Future<Integer>> list = new ArrayList<Future<Integer>>();
            for(int i=1; i< 5; i++){
                WorkerThread worker = new WorkerThread("WorkerThread:::_"+i);
                Future<Integer> future = executorPool.submit(worker);
                list.add(future);
            }

            for(Future<Integer> future : list){
                try {
                    try {
                        future.get(1000, TimeUnit.MILLISECONDS);
                    } catch (TimeoutException e) {
                        future.cancel(true);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            executorPool.shutdown();
        }

    }

The Timeout of the thread is keep incresaing for the future threads, My expectation should be that if all the threads are taking more than 1 Second should close all at a time with in the ! second.

In the aboce scenario, worker thread is taking 10 sec to process, But i m timing out all my 4 threads with in 1 Seconds , but each thread time increasing incrementaly by 1 second for each task.

first thred timeOut is 1 Second second thred timeOut is 2 Second third thred timeOut is 3 Second.

Why all threads are not interupting in 1 Second itself?Any problem with my code?

Upvotes: 1

Views: 547

Answers (1)

Adam Kotwasinski
Adam Kotwasinski

Reputation: 4554

Because you are waiting sequentially in a loop in this section:

for(Future<Integer> future : list) {
  ...
  future.get(1000, TimeUnit.MILLISECONDS);
  ...
}

Basically the flow is:

 - all workers 1 .. 4 start
 - you wait for worker A to finish
 - 1 second passes, TimeoutException (worker A was alive for 1 second)
 - you wait for worker B to finish
 - 1 second passes, TimeoutException (worker B was alive for 2 seconds)
 - you wait for worker C to finish
 - 1 second passes, TimeoutException (worker C was alive for 3 seconds)
 - ... same for D ...

If you want to wait for at most 1 second for all workers you need to count how much time you spent waiting so far, and then wait for remaining time. Something like the pseudocode:

long quota = 1000
for (Future future : futures) {
  long start = System.currentTimeMillis
  try {
    future.get(quota, MILLISECONDS)
  }
  catch (TimeoutException e) {
    future.cancel(true)
  }
  finally {
    long spent = System.currentTimeMillis() - start
    quota -= spent
    if (quota < 0) {quota = 0} // the whole block is going to execute longer than .get() only
  }
}


Upvotes: 1

Related Questions