Mark Pegasov
Mark Pegasov

Reputation: 5289

Concurrently find and remove element from collection or wait

public class MyClass {
    private List<Integer> resources = new ArrayList<>();

    public synchronized Integer getAndRemoveResourceOrWait(Integer requestedResource) throws InterruptedException {
        while(resources.stream().anyMatch((r) -> { return r >= requestedResource; })) {
            wait();
        }
        Integer found = resources.stream().findFirst((r) -> {
            return r >= requestedResource;
        }).get();
        resources.remove(found);
        return found;
    }

    public void addResource(Integer resource) {
        resources.add(resource);
        notifyAll();
    }
}

Thread "A" episodically invokes addResource with random value. A few another threads actively invokes getAndRemoveResourceOrWait.

What I need to do to let method getAndRemoveResourceOrWait work concurrently?

For example, thread "X" invokes getAndRemoveResourceOrWait with variable 128 which does not exists in resources collection. So, it become waiting for it. While it is waiting, thread "Y" invokes getAndRemoveResourceOrWait with variable 64 and it exists in resources collection. Thread "Y" should not wait for thread "X" to complete.

Upvotes: 0

Views: 195

Answers (1)

Stephen C
Stephen C

Reputation: 718906

What I need to do to let method getAndRemoveResourceOrWait work concurrently?

It simply needs to run on a different thread to the one that calls addResource(resource).

Note that getAndRemoveResource is a blocking (synchronous) operation in the sense that the thread making the call is blocked until it gets the answer. However one thread that is calling getAndRemoveResource does not block another thread calling getAndRemoveResource. The key is that the wait() call releases the mutex, and then reacquires it when the mutex is notified. What will happen here is that a notifyAll will cause all waiting threads to way up, one at a time.

However, there is a bug on your addResource method. The method needs to be declared as synchronized. If you don't call notifyAll() while the current thread holds the mutex for on this, you will get an exception. (And this is also necessary to ensure that the updates to the shared resources object are visible ... in both directions.)

Also, this implementation is not going to scale well:

  • Each waiting thread will scan the entire resource list on every update; i.e. on every call to addResource.
  • When a waiting thread finds a resource, it will scan the list twice more to remove it.
  • All of this is done while holding the mutex on the shared MyClass instance ... which blocks addResource as well.

UPDATE - Assuming that the Resource values are unique, a better solution would be to use replace ArrayList with TreeSet. This should work:

public class MyClass {
    private TreetSet<Integer> resources = new TreeSet<>();

    public synchronized Integer getAndRemoveResourceOrWait(
            Integer resource) throws InterruptedException {
        while (true) {
            Integer found = resources.tailSet(resource, true).pollFirst();
            if (found != null) {
                return found;
            }
            wait();
        }
    }

    public synchronized void addResource(Integer resource) {
        resources.add(resource);
        notifyAll();
    }
}

(I also tried ConcurrentSkipListSet but I couldn't figure out a way to avoid using a mutex while adding and removing. If you were trying to remove an equal resource, it could be done ...)

Upvotes: 1

Related Questions