Reputation: 11565
I'm looking for a queue that would be the asynchronous (non-blocking) equivalent of java.util.concurrent.BlockingQueue
. Its interface would include:
public interface AsynchronousBlockingQueue<E> {
// - if the queue is empty, return a new CompletableFuture,
// that will be completed next time `add` is called
// - if the queue is not empty, return a completed CompletableFuture,
containing the first element of the list
public CompletableFuture<E> poll();
// if polling is in progress, complete the ongoing polling CompletableFuture.
// otherwise, add the element to the queue
public synchronized void add(E element);
}
If that matters, there should be just one poller thread, and polling should be done sequentially (poll
will not be called when polling is already in progress).
I expected it to already exist in the JVM, but I couldn't find it, and of course I'd rather use something from the JVM than write it myself.
Another constraint, I'm stuck with Java 8 (even though I'm definitely interested in knowing what exists in more recent versions).
Upvotes: 7
Views: 2348
Reputation: 11565
So finally I wrote my own class... Interested in comments :)
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
public class AsynchronousBlockingQueue<E> {
CompletableFuture<E> incompletePolling = null;
Queue<E> elementsQueue = new LinkedList<>();
// if the queue is empty, return a new CompletableFuture, that will be completed next time `add` is called
// if the queue is not empty, return a completed CompletableFuture containing the first element of the list
public synchronized CompletableFuture<E> poll() {
// polling must be done sequentially, so this shouldn't be called if there is a poll ongoing.
if (incompletePolling != null)
throw new IllegalStateException("Polling is already ongoing");
if (elementsQueue.isEmpty()) {
incompletePolling = new CompletableFuture<>();
return incompletePolling;
}
CompletableFuture<E> result = new CompletableFuture<>();
result.complete(elementsQueue.poll());
return result;
}
// if polling is in progress, complete the ongoing polling CompletableFuture.
// otherwise, add the element to the queue
public synchronized void add(E element) {
if (incompletePolling != null) {
CompletableFuture<E> result = incompletePolling;
// removing must be done first because the completion could trigger code that needs the queue state to be valid
incompletePolling = null;
result.complete(element);
return;
}
elementsQueue.add(element);
}
}
Upvotes: 4