Reputation: 1119
I'm looking for a java collection that supports blocking read()
s on a predicate. I wrote a simple version but it seems like this must have been invented already?
For example:
interface PredicateConsumerCollection<T> {
public void put(T t);
@Nullable
public T get(Predicate<T> p, long millis) throws InterruptedException;
}
put()
delivers its argument to a waiting consumer with a matching predicate, or stashes it in a store. A get()
returns immediately if a suitable T
is already in the store, or blocks till a suitable value is put(), or times out. Consumers compete but fairness isn't critical in my case.
Anyone aware of a such a collection?
Upvotes: 2
Views: 846
Reputation: 1535
I also need something very similar for testing that a certain JMS asynchronous message has been received within a certain timeout. It turns out that your question is relatively easy to implement by using basic wait/notify as explained in the Oracle tutorials. The idea is to make the put and query methods synchronized and let the query method do a wait. The put method calls notifyAll to wake up any waiting threads in the query method. The query method must then check if the predicate is matched. The most tricky thing is getting the timeout right due to waking up when the predicate does not match and due to possible " spurious wakeups". I found this stackoverflow post that provides the answer.
Here is the implementation I came up with:
import java.util.ArrayList;
import java.util.List;
// import net.jcip.annotations.GuardedBy;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
public class PredicateConsumerCollectionImpl<T> implements
PredicateConsumerCollection<T> {
// @GuardedBy("this")
private List<T> elements = new ArrayList<>();
@Override
public synchronized void put(T t) {
elements.add(t);
notifyAll();
}
@Override
public synchronized T query(Predicate<T> p, long millis)
throws InterruptedException {
T match = null;
long nanosOfOneMilli = 1000000L;
long endTime = System.nanoTime() + millis * nanosOfOneMilli;
while ((match = Iterables.find(elements, p, null)) == null) {
long sleepTime = endTime - System.nanoTime();
if (sleepTime <= 0) {
return null;
}
wait(sleepTime / nanosOfOneMilli,
(int) (sleepTime % nanosOfOneMilli));
}
return match;
}
synchronized boolean contains(T t) {
return elements.contains(t);
}
}
And here is a JUnit test that proves that the code works as intended:
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
import com.google.common.base.Predicate;
/**
* Unit test for the {@link PredicateConsumerCollection} implementation.
*
* <p>
* The tests act as consumers waiting for the test Producer to put a certain
* String.
*/
public class PredicateConsumerCollectionTest {
private static class Producer implements Runnable {
private PredicateConsumerCollection<String> collection;
public Producer(PredicateConsumerCollection<String> collection) {
this.collection = collection;
collection.put("Initial");
}
@Override
public void run() {
try {
int millis = 50;
collection.put("Hello");
Thread.sleep(millis);
collection.put("I");
Thread.sleep(millis);
collection.put("am");
Thread.sleep(millis);
collection.put("done");
Thread.sleep(millis);
collection.put("so");
Thread.sleep(millis);
collection.put("goodbye!");
} catch (InterruptedException e) {
e.printStackTrace();
fail("Unexpected InterruptedException");
}
}
}
private PredicateConsumerCollectionImpl<String> collection;
private Producer producer;
@Before
public void setup() {
collection = new PredicateConsumerCollectionImpl<>();
producer = new Producer(collection);
}
@Test(timeout = 2000)
public void wait_for_done() throws InterruptedException {
assertTrue(collection.contains("Initial"));
assertFalse(collection.contains("Hello"));
Thread producerThread = new Thread(producer);
producerThread.start();
String result = collection.query(new Predicate<String>() {
@Override
public boolean apply(String s) {
return "done".equals(s);
}
}, 1000);
assertEquals("done", result);
assertTrue(collection.contains("Hello"));
assertTrue(collection.contains("done"));
assertTrue(producerThread.isAlive());
assertFalse(collection.contains("goodbye!"));
producerThread.join();
assertTrue(collection.contains("goodbye!"));
}
@Test(timeout = 2000)
public void wait_for_done_immediately_happens() throws InterruptedException {
Thread producerThread = new Thread(producer);
producerThread.start();
String result = collection.query(new Predicate<String>() {
@Override
public boolean apply(String s) {
return "Initial".equals(s);
}
}, 1000);
assertEquals("Initial", result);
assertFalse(collection.contains("I"));
producerThread.join();
assertTrue(collection.contains("goodbye!"));
}
@Test(timeout = 2000)
public void wait_for_done_never_happens() throws InterruptedException {
Thread producerThread = new Thread(producer);
producerThread.start();
assertTrue(producerThread.isAlive());
String result = collection.query(new Predicate<String>() {
@Override
public boolean apply(String s) {
return "DONE".equals(s);
}
}, 1000);
assertEquals(null, result);
assertFalse(producerThread.isAlive());
assertTrue(collection.contains("goodbye!"));
}
}
Upvotes: 0
Reputation: 14279
There is no immediate class that can solve your problem, but a combination of a ConcurrentHashMap and a BlockingQueue could be a solution.
The hash map is defined as:
final ConcurrentHashMap<Predicate, LinkedBlockingQueue<Result>> lookup;
The put needs to ensure, that for each Predicate a queue is added to the map, this can be done thread-safe using putIfAbsent
.
If you have a fixed set of Predicates, you can simply pre-fill the list, then a Consumer can simply call lookup.get(Predicate).take()
If the amount of Predicates is unknown/too many, you need to write a wait/notify implementation for Consumers in case a Predicate is not yet in the list on your own.
Upvotes: 2