Reputation: 191
I am new to multi-threading and would like to avoid a race condition that is occurring in the below code. In the release() method there is a line available.add(resource) and in the remove() method there is a line available.remove(resource). So my question is how do I synchronize the 'resource' variable to avoid this race condition?
package threadpool;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class ResourcePoolImpl<R> implements ResourcePool<R> {
private static final String CLOSED_POOL_EXCEPTION = "Pool is closed,cannot aquire resource.";
private static final String RELEASE_EXCEPTION = "Unaquired resource, cannot release it.";
private volatile boolean open = false;
private final BlockingQueue<R> available = new LinkedBlockingQueue<R>();
private final ConcurrentMap<R, CountDownLatch> aquired = new ConcurrentHashMap<R, CountDownLatch>();
public R acquire() throws InterruptedException {
if ( !open ) { throw new IllegalStateException( CLOSED_POOL_EXCEPTION ); }
final R resource = available.take();
if ( resource != null ) {
aquired.put( resource, new CountDownLatch( 1 ) );
}
return resource;
}
public R acquire( final long timeout, final TimeUnit timeUnit ) throws InterruptedException {
if ( !open ) { throw new IllegalStateException( CLOSED_POOL_EXCEPTION ); }
final R resource = available.poll( timeout, timeUnit );
if ( resource != null ) {
aquired.put( resource, new CountDownLatch( 1 ) );
}
return resource;
}
public boolean add( final R resource )
{
return available.add( resource );
}
public void close() throws InterruptedException {
open = false;
for ( final CountDownLatch latch : aquired.values() ) {
latch.await();
}
}
public void closeNow() {
open = false;
}
public boolean isOpen() {
return open;
}
public void open() {
open = true;
}
public void release( final R resource )
{
final CountDownLatch latch = aquired.get( resource );
if ( latch == null ) { throw new IllegalArgumentException( RELEASE_EXCEPTION ); }
available.add( resource );
latch.countDown();
}
public boolean remove( final R resource ) throws InterruptedException
{
final CountDownLatch latch = aquired.get( resource );
if ( latch != null ) {
latch.await();
}
return available.remove( resource );
}
public boolean removeNow( final R resource ) {
return available.remove( resource );
}
}
Upvotes: 3
Views: 693
Reputation: 17955
Declare a
final Object mutex = new Object();
and have all methods that do read/write operations on shared collections aquire the mutex before doing the operations, or make decisions based on shared data, execute within a synchronized block:
synchronized (mutex) {
// .. guaranteed single-threaded access here
// (for instance, contents of aquire() or release();
// also add() or any other collection access)
}
You can then use the simpler non-concurrent collection classes, as within the mutex-guarded areas, there cannot be any multi-threaded access.
Concurrent collections simply wrap their accesses within their own internal mutual-exclusion locks -- but the problem, as you explain in your comments, is that aquired
and available
may be updated independently of each other, which you definitely do not want.
Therefore: simplify your code by declaring and using a single mutex for all critical-area accesses.
Upvotes: 2