Reputation: 133
I'm using AMQP to queue multiple threads doing graph searches. The graph is not modified except on a regular interval by a separate thread. What's the best concurrency model to wait for all active searches to complete, block those threads, and allow the update thread to modify the graph before unblocking the search threads?
I've been reading through http://docs.oracle.com/javase/tutorial/essential/concurrency/ but I can't seem to find anything that exactly fits my model.
Any suggestions?
thanks!
edit: i'm using ExecutorService to handle the threads.
Upvotes: 2
Views: 124
Reputation: 17145
Do you really need to block? Maybe non-blocking copy-on-write could suffice.
Updating thread should make a clone of current graph structure and apply updates on the clone. Once it is done with updates the new graph should be announced - just overwriting the shared reference.
Searching thread should save the graph reference to local variable or scope once and use it. Shared graph is never modified so there is no need for any locking and waiting.
Pros:
Cons:
It is also possible to apply copy-on-write only to part of the graph. Especially if graph is memory consuming structure. However it is quite hard topic - see MVCC and STM (Software Transactional Memory).
Upvotes: 1
Reputation: 1277
I'm not familiar with AMQP but this is a producer/consumer problem so there are several ways to deal with this in Java. This is a really quick and dirty example with Futures and ReentrantLock:
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Demo {
private static Random rand = new Random();
private static final Lock lock = new ReentrantLock();
private static boolean updating = false;
private static List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
private static ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() {
return rand.nextInt();
}
};
private static void doUpdate() {
if (lock.tryLock()) {
updating = true;
try {
for (Future<Integer> future : futureList) {
System.out.println(future.get());
}
futureList.clear();
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println();
lock.unlock();
updating = false;
}
}
}
public static void main(String[] args) throws Exception {
// submitter thread
new Thread(new Runnable() {
@Override
public void run() {
int submitCount = 0;
while (submitCount < 10) {
if (!updating) {
futureList.add(pool.submit(callable));
submitCount++;
}
try {
Thread.sleep(1000); // arbitrary
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
// update thread
new Thread(new Runnable() {
@Override
public void run() {
int updateCount = 0;
while (updateCount < 5) {
doUpdate();
updateCount++;
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
I set the update thread to half the frequency of the submit thread. So if you run this you'll see the updater peel off two integers each time it runs. The submit thread has to wait until the updater releases the lock.
There are other approaches - look into the BlockingQueue interface: you may want to experiment.
Upvotes: 0