Jordan
Jordan

Reputation: 1

Is there a way for two Threads to not get the same item from a shared collection?

I have a program that has 3 threads. One thread is dedicated to adding items to a shared collection and the remaining two threads are for removing an item from the collection and processing it.

I created a Runnable for adding. It looks like this:

public class Adder implements Runnable {

    private static final Integer MAX_NO = 1000; // I wanted this to be Integer.MAX_VALUE
    private final List<Integer> shared;
    private final AtomicBoolean isAddingDone;

    public Adder(final List<Integer> shared, final AtomicBoolean isAddingDone) {
        this.shared= shared;
        this.isAddingDone = isAddingDone;
    }

    @Override
    public void run() {
        for (int i = 0; i < MAX_NO; i++) {
            shared.add(i);
        }
        isAddingDone.set(true);
    }

}

I created another Runnable that removes an item from the shared collection.

public class Remover implements Runnable {

    private final int id;
    private boolean stopRequested = false;
    private final List<Integer> shared;
    private final AtomicBoolean isAddingDone;

    public Remover(final int id, final List<Integer> shared, final AtomicBoolean isAddingDone) {
        this.id = id;
        this.shared = shared;
        this.isAddingDone = isAddingDone;
    }

    public synchronized void requestStop() {
        this.stopRequested = true;
    }

    public synchronized boolean isStopRequested() {
        return this.stopRequested;
    }

    @Override
    public void run() {
        System.out.println("Starting Remover" + id + "...");
        while (!isStopRequested()) {
            if (shared.isEmpty() && isAddingDone.get()) {
                this.requestStop();
            } else {
                final Integer value = shared.get(0);
                System.out.println("Remover" + this.id + " retrieved " + value + ". Removing...");
                shared.remove(value);
                // processing of value here
            }
        }
    }

}

Here is the main method:

public static void main(String[] args) {
    final List<Integer> shared = new Vector<>();
    final AtomicBoolean isAddingDone = new AtomicBoolean(false);

    final Runnable tAdder = new Adder(shared, isAddingDone);
    final Runnable tRemover0 = new Remover(0, shared, isAddingDone);
    final Runnable tRemover1 = new Remover(1, shared, isAddingDone);

    Thread t1 = new Thread(tAdder, "Thread-Adder");
    t1.start();

    Thread r0 = new Thread(tRemover0, "Thread-Remover0");
    r0.start();

    Thread r1 = new Thread(tRemover1, "Thread-Remover1");
    r1.start();
}

I got no errors when running the program but I noticed that at some point both threads are retrieving the same item.

Starting Remover0...
Starting Remover1...
Remover1 retrieved 0. Removing...
Remover0 retrieved 0. Removing...
Remover1 retrieved 1. Removing...
Remover0 retrieved 1. Removing...
Remover1 retrieved 2. Removing...
Remover1 retrieved 3. Removing...

Is there a way where the two removers are not processing the same item? I tried using an iterator but when running, I got a ConcurrentModificationException.

Upvotes: 0

Views: 37

Answers (1)

Andy Turner
Andy Turner

Reputation: 140534

shared.isEmpty(), shared.get(0) and shared.remove(value) don't happen atomically, so there is no guarantee that the list hasn't changed in between.

If you want to communicate between threads, use a BlockingQueue, and use poll() to remove an item from it if one is available. This allows you to combine the isEmpty(), get and remove into a single atomic action.

// From the while loop inside Remover:
            Integer value = shared.poll();
            if (value == null /* indicates the queue was empty when you looked. */
                    && isAddingDone.get()) {
                this.requestStop();
            } else {
                System.out.println("Remover" + this.id + " retrieved " + value + ". Removing...");
                // No need to remove, you already did.
                // processing of value here
            }

Upvotes: 2

Related Questions