Nick Boudreau
Nick Boudreau

Reputation: 133

looking for java concurrency model suggestion

I'm using AMQP to queue multiple threads doing graph searches. The graph is not modified except on a regular interval by a separate thread. What's the best concurrency model to wait for all active searches to complete, block those threads, and allow the update thread to modify the graph before unblocking the search threads?

I've been reading through http://docs.oracle.com/javase/tutorial/essential/concurrency/ but I can't seem to find anything that exactly fits my model.

Any suggestions?

thanks!

edit: i'm using ExecutorService to handle the threads.

Upvotes: 2

Views: 124

Answers (2)

gertas
gertas

Reputation: 17145

Do you really need to block? Maybe non-blocking copy-on-write could suffice.

Updating thread should make a clone of current graph structure and apply updates on the clone. Once it is done with updates the new graph should be announced - just overwriting the shared reference.

Searching thread should save the graph reference to local variable or scope once and use it. Shared graph is never modified so there is no need for any locking and waiting.

Pros:

  • no locking and waiting of readers,
  • if update fails, readers still work with old structure
  • ideal for long running and occasional updates where there is more reads than updates
  • garbage collector takes care of old graphs

Cons:

  • some readers may operate on old data if they started before the update - this may be resolved by checking if original reference to graph changed and eventually restarting the whole operation.
  • multiple writers may introduce conflicts in graph, but it may be resolved by several conflict resolution techniques, the easiest is to ignore previous changes and overwrite ("Take mine").

It is also possible to apply copy-on-write only to part of the graph. Especially if graph is memory consuming structure. However it is quite hard topic - see MVCC and STM (Software Transactional Memory).

Upvotes: 1

spudone
spudone

Reputation: 1277

I'm not familiar with AMQP but this is a producer/consumer problem so there are several ways to deal with this in Java. This is a really quick and dirty example with Futures and ReentrantLock:

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Demo {
    private static Random rand = new Random();
    private static final Lock lock = new ReentrantLock();
    private static boolean updating = false;

    private static List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
    private static ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    private static Callable<Integer> callable = new Callable<Integer>() {
        @Override
        public Integer call() {
            return rand.nextInt();
        }
    };

    private static void doUpdate() {
        if (lock.tryLock()) {
            updating = true;
            try {
                for (Future<Integer> future : futureList) {
                    System.out.println(future.get());
                }
                futureList.clear();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                System.out.println();
                lock.unlock();
                updating = false;
            }
        }
    }

    public static void main(String[] args) throws Exception {
        // submitter thread
        new Thread(new Runnable() {
            @Override
            public void run() {
                int submitCount = 0;
                while (submitCount < 10) {
                    if (!updating) {
                        futureList.add(pool.submit(callable));
                        submitCount++;
                    }

                    try {
                        Thread.sleep(1000); // arbitrary
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

        // update thread
        new Thread(new Runnable() {
            @Override
            public void run() {
                int updateCount = 0;
                while (updateCount < 5) {
                    doUpdate();
                    updateCount++;
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
}

I set the update thread to half the frequency of the submit thread. So if you run this you'll see the updater peel off two integers each time it runs. The submit thread has to wait until the updater releases the lock.

There are other approaches - look into the BlockingQueue interface: you may want to experiment.

Upvotes: 0

Related Questions