MrDuk
MrDuk

Reputation: 18242

How can I ensure an ExecutorService pool has completed, without shutting it down?

Currently, I'm making sure my tasks have finished before moving on like so:

    ExecutorService pool = Executors.newFixedThreadPool(5);

    public Set<Future> EnqueueWork(StreamWrapper stream) {
        Set<Future> futureObjs = new HashSet<>();
        util.setData(stream);
        Callable callable = util;

        Future future = pool.submit(callable);
        futureObjs.add(future);

        pool.shutdown();
        try {
            pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        Node.sendTCP(Node.getNodeByHostname(StorageTopology.getNextPeer()), Coordinator.prepareForTransport(stream));

        return futureObjs;
    }

However, because of some other threading on my socket, it's possible that multiple calls are made to EnqueueWork - I'd like to make sure the calls to .submit have completed in the current thread, without shutting down the pool for subsequent threads coming in.

Is this possible?

Upvotes: 0

Views: 160

Answers (4)

Ravindra babu
Ravindra babu

Reputation: 38910

You have already added Future to the set. Just add below code block to get the status of each Future task by calling get() with time out period.

In my example, time out is 60 seconds. You can change it as per your requirement.

Sample code:

        try{
            for(Future future : futureObjs){
            System.out.println("future.status = " + future.get(60000, TimeUnit.MILLISECONDS));
            }
        }catch(Exception err){
            err.printStackTrace();
        }

Other useful posts:

How to forcefully shutdown java ExecutorService

How to wait for completion of multiple tasks in Java?

Upvotes: 0

TwoThe
TwoThe

Reputation: 14259

You are approaching this issue from the wrong direction. If you need to know whether or not your tasks are finished, that means you have a dependency of A->B. The executor is the wrong place to ensure that dependency, as much as you don't ask the engine of your car "are we there yet?".

Java offers several features to ensure that a certain state has been reached before starting a new execution path. One of them is the invokeAll method of the ExecutorService, that returns only when all tasks that have been submitted are completed.

pool.invokeAll(listOfAllMyCallables);
// if you reach this point all callables are completed

Upvotes: 0

pandaadb
pandaadb

Reputation: 6456

I agree with one of the comments, it seems odd that your executor can be used by different threads. Usually and executor is private to an instance of some class, but anyhow.

What you can do, from the docs, is to check:

getActiveCount() - Returns the approximate number of threads that are >actively executing tasks.

NOTE: This is a blocking method, it will take out a lock on the workers of your threadpool and block until it has counted everything

And also check:

getQueue() - Returns the task queue used by this executor. Access to the task queue is intended primarily for debugging and monitoring. This queue may be in active use. Retrieving the task queue does not prevent queued tasks from executing.

If your queue is empty and the activeCount is 0, all your tasks should have finished. I say should because getActiveCount says "approximate". Looking at the impl, this is most likely because the worker internally has a flag indicating that it is locked (in use). There is in theory a slight race between executing and the worker being done and marking itself so. A better approach would in fact be to track the features. You would have to check the Queue and that all futures are done.

However I think what you really need is to reverse your logic. Instead of the current thread trying to work out if another thread has submitted work in the meantime, you should have the other thread call isShutdown() and simply not submit a new task in that case.

Upvotes: 0

akki
akki

Reputation: 451

You can check by invoking isDone() method on all the Future objects in futureObjs. You need to make sure isDone is called in a loop. calling get() method on Future object is another option, since get() is a blocking call, it will return only after task is completed and result is ready. But do you really want to keep the pool open after all the tasks are done?

Upvotes: 2

Related Questions