Reputation: 63
I'm trying to find a less clunky solution to a Java concurrency problem.
The gist of the problem is that I need a shutdown call to block while there are still worker threads active, but the crucial aspect is that the worker tasks are each spawned and completed asynchronously so the hold and release must be done by different threads. I need them to somehow send a signal to the shutdown thread once their work has completed. Just to make things more interesting, the worker threads cannot block each other so I'm unsure about the application of a Semaphore in this particular instance.
I have a solution which I think safely does the job, but my unfamiliarity with the Java concurrency utils leads me to think that there might be a much easier or more elegant pattern. Any help in this regard would be greatly appreciated.
Here's what I have so far, fairly sparse except for the comments:
final private ReentrantReadWriteLock shutdownLock = new ReentrantReadWriteLock();
volatile private int activeWorkerThreads;
private boolean isShutdown;
private void workerTask()
{
try
{
// Point A: Worker tasks mustn't block each other.
shutdownLock.readLock().lock();
// Point B: I only want worker tasks to continue if the shutdown signal
// hasn't already been received.
if (isShutdown)
return;
activeWorkerThreads ++;
// Point C: This async method call returns immediately, soon after which
// we release our lock. The shutdown thread may then acquire the write lock
// but we want it to continue blocking until all of the asynchronous tasks
// have completed.
executeAsynchronously(new Runnable()
{
@Override
final public void run()
{
try
{
// Do stuff.
}
finally
{
// Point D: Release of shutdown thread loop, if there are no other
// active worker tasks.
activeWorkerThreads --;
}
}
});
}
finally
{
shutdownLock.readLock().unlock();
}
}
final public void shutdown()
{
try
{
// Point E: Shutdown thread must block while any worker threads
// have breached Point A.
shutdownLock.writeLock().lock();
isShutdown = true;
// Point F: Is there a better way to wait for this signal?
while (activeWorkerThreads > 0)
;
// Do shutdown operation.
}
finally
{
shutdownLock.writeLock().unlock();
}
}
Thanks in advance for any help!
Russ
Upvotes: 4
Views: 1214
Reputation: 9621
You can use a semaphore in this scenario and not require a busy wait for the shutdown() call. The way to think of it is as a set of tickets that are handed out to workers to indicate that they are in-flight. If the shutdown() method can acquire all of the tickets then it knows that it has drained all workers and there is no activity. Because #acquire() is a blocking call the shutdown() won't spin. I've used this approach for a distributed master-worker library and its easy extend it to handle timeouts and retrials.
Executor executor = // ...
final int permits = // ...
final Semaphore semaphore = new Semaphore(permits);
void schedule(final Runnable task) {
semaphore.acquire();
try {
executor.execute(new Runnable() {
@Override public run() {
try {
task.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
throw e;
}
}
void shutDown() {
semaphore.acquireUninterruptibly(permits);
// do stuff
}
Upvotes: 2
Reputation: 26882
This is more like a comment to sbridges answer, but it was a bit too long to submit as a comment.
Anyways, just 1 comment.
When you shutdown the executor, submitting new task to the executor will result in unchecked RejectedExecutionException
if you use the default implementations (like Executors.newSingleThreadExecutor()
). So in your case you probably want to use the following code.
code:
new ThreadPoolExecutor(1,
1,
1,
TimeUnit.HOURS,
new LinkedBlockingQueue<Runnable>(),
new ThreadPoolExecutor.DiscardPolicy());
This way, the tasks that were submitted to the executor after shutdown()
was called, are simply ignored. The parameter above (1,1... etc) should produce an executor that basically is a single-thread executor, but doesn't throw the runtime exception.
Upvotes: 0
Reputation: 20671
Whoa nelly. Never do this:
// Point F: Is there a better way to wait for this signal?
while (activeWorkerThreads > 0)
;
You're spinning and consuming CPU. Use a proper notification:
First: synchronize
on an object, then check activeWorkerThreads, and wait()
on the object if it's still > 0:
synchronized (mutexObject) {
while (activeWorkerThreads > 0) {
mutexObject.wait();
}
}
Second: Have the workers notify() the object after they decrement the activeWorkerThreads count. You must synchronize on the object before calling notify.
synchronized (mutexObject) {
activeWorkerThreads--;
mutexObject.notify();
}
Third: Seeing as you are (after implementing 1 & 2) synchronizing on an object whenever you touch activeWorkerThreads, use it as protection; there is no need for the variable to be volatile.
Then: the same object you use as a mutex for controlling access to activeWorkerThreads could also be used to control access to isShutdown. Example:
synchronized (mutexObject) {
if (isShutdown) {
return;
}
}
This won't cause workers to block each other except for immeasurably small amounts of time (which you likely do not avoid by using a read-write lock anyway).
Upvotes: 1
Reputation: 11775
ExecutorService
should be a preferred solution as sbridges mentioned.
As an alternative, if the number of worker threads is fixed, then you can use CountDownLatch:
final CountDownLatch latch = new CountDownLatch(numberOfWorkers);
Pass the latch
to every worker thread and call latch.countDown()
when task is done.
Call latch.await()
from the main thread to wait for all tasks to complete.
Upvotes: 2
Reputation: 25150
Declaring activeWorkerThreads as volatile doesn't allow you to do activeWorkerThreads++, as ++ is just shorthand for,
activeWorkerThreads = activeWorkerThreads + 1;
Which isn't atomic. Use AtomicInteger instead.
Does executeAsynchronously() send jobs to a ExecutorService? If so you can just use the awaitTermination method, so your shutdown hook will be,
executor.shutdown();
executor.awaitTermination(1, TimeUnit.Minutes);
Upvotes: 3