Reputation: 136371
Consider a few web server instances running in parallel. Each server holds a reference to a single shared "Status keeper", whose role is keeping the last N
requests from all servers.
For example (N=3
):
Server a: "Request id = ABCD" Status keeper=["ABCD"]
Server b: "Request id = XYZZ" Status keeper=["ABCD", "XYZZ"]
Server c: "Request id = 1234" Status keeper=["ABCD", "XYZZ", "1234"]
Server b: "Request id = FOO" Status keeper=["XYZZ", "1234", "FOO"]
Server a: "Request id = BAR" Status keeper=["1234", "FOO", "BAR"]
At any point in time, the "Status keeper" might be called from a monitoring application that reads these last N
requests for an SLA report.
What's the best way to implement this producer-consumer scenario in Java, giving the web servers higher priority than the SLA report?
CircularFifoBuffer seems to be the appropriate data structure to hold the requests, but I'm not sure what's the optimal way to implement efficient concurrency.
Upvotes: 27
Views: 25813
Reputation:
Since you specifically ask to give writers (that is web servers) higher priority than the reader (that is monitoring), I would suggest the following design.
Web servers add request information to a concurrent queue which is read by a dedicated thread, which adds requests to a thread-local (therefore non-synchronized) queue that overwrites the oldest element, like EvictingQueue
or CircularFifoQueue
.
This same thread checks a flag which indicates if a report has been requested after every request processed, and if positive, produces a report from all elements present in the thread-local queue.
Upvotes: 0
Reputation: 15678
Maybe you want to look at Disruptor - Concurrent Programming Framework.
java.util.concurrent.ArrayBlockingQueue
here: pdfIf the library is too much, stick to java.util.concurrent.ArrayBlockingQueue
Upvotes: 3
Reputation: 6801
If it were me, I would use the CircularFIFOBuffer as you indicated, and synchronize around the buffer when writing (add). When the monitoring application wants to read the buffer, synchronize on the buffer, and then copy or clone it to use for reporting.
This suggestion is predicated on the assumption that latency is minimal to copy/clone the buffer to a new object. If there are large number of elements, and copying time is slow, then this is not a good idea.
Pseudo-Code example:
public void writeRequest(String requestID) {
synchronized(buffer) {
buffer.add(requestID);
}
}
public Collection<String> getRequests() {
synchronized(buffer) {
return buffer.clone();
}
}
Upvotes: 0
Reputation: 26723
Hazelcast's Queue offers almost everything you ask for, but doesn't support circularity. But from your description I am not sure if you actually need it.
Upvotes: 1
Reputation: 533710
I would have a look at ArrayDeque, or for a more concurrent implementation have a look at the Disruptor library which is one of the most sophisticated/complex ring buffer in Java.
An alternative is to use an unbounded queue which is more concurrent as the producer never needs to wait for the consumer. Java Chronicle
Unless your needs justify the complexity, an ArrayDeque may be all you need.
Upvotes: 2
Reputation: 65859
Here's a lock-free ring buffer implementation. It implements a fixed-size buffer - there is no FIFO functionality. I would suggest you store a Collection
of requests for each server instead. That way your report can do the filtering rather than getting your data structure to filter.
/**
* Container
* ---------
*
* A lock-free container that offers a close-to O(1) add/remove performance.
*
*/
public class Container<T> implements Iterable<T> {
// The capacity of the container.
final int capacity;
// The list.
AtomicReference<Node<T>> head = new AtomicReference<Node<T>>();
// TESTING {
AtomicLong totalAdded = new AtomicLong(0);
AtomicLong totalFreed = new AtomicLong(0);
AtomicLong totalSkipped = new AtomicLong(0);
private void resetStats() {
totalAdded.set(0);
totalFreed.set(0);
totalSkipped.set(0);
}
// TESTING }
// Constructor
public Container(int capacity) {
this.capacity = capacity;
// Construct the list.
Node<T> h = new Node<T>();
Node<T> it = h;
// One created, now add (capacity - 1) more
for (int i = 0; i < capacity - 1; i++) {
// Add it.
it.next = new Node<T>();
// Step on to it.
it = it.next;
}
// Make it a ring.
it.next = h;
// Install it.
head.set(h);
}
// Empty ... NOT thread safe.
public void clear() {
Node<T> it = head.get();
for (int i = 0; i < capacity; i++) {
// Trash the element
it.element = null;
// Mark it free.
it.free.set(true);
it = it.next;
}
// Clear stats.
resetStats();
}
// Add a new one.
public Node<T> add(T element) {
// Get a free node and attach the element.
totalAdded.incrementAndGet();
return getFree().attach(element);
}
// Find the next free element and mark it not free.
private Node<T> getFree() {
Node<T> freeNode = head.get();
int skipped = 0;
// Stop when we hit the end of the list
// ... or we successfully transit a node from free to not-free.
while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
skipped += 1;
freeNode = freeNode.next;
}
// Keep count of skipped.
totalSkipped.addAndGet(skipped);
if (skipped < capacity) {
// Put the head as next.
// Doesn't matter if it fails. That would just mean someone else was doing the same.
head.set(freeNode.next);
} else {
// We hit the end! No more free nodes.
throw new IllegalStateException("Capacity exhausted.");
}
return freeNode;
}
// Mark it free.
public void remove(Node<T> it, T element) {
totalFreed.incrementAndGet();
// Remove the element first.
it.detach(element);
// Mark it as free.
if (!it.free.compareAndSet(false, true)) {
throw new IllegalStateException("Freeing a freed node.");
}
}
// The Node class. It is static so needs the <T> repeated.
public static class Node<T> {
// The element in the node.
private T element;
// Are we free?
private AtomicBoolean free = new AtomicBoolean(true);
// The next reference in whatever list I am in.
private Node<T> next;
// Construct a node of the list
private Node() {
// Start empty.
element = null;
}
// Attach the element.
public Node<T> attach(T element) {
// Sanity check.
if (this.element == null) {
this.element = element;
} else {
throw new IllegalArgumentException("There is already an element attached.");
}
// Useful for chaining.
return this;
}
// Detach the element.
public Node<T> detach(T element) {
// Sanity check.
if (this.element == element) {
this.element = null;
} else {
throw new IllegalArgumentException("Removal of wrong element.");
}
// Useful for chaining.
return this;
}
public T get () {
return element;
}
@Override
public String toString() {
return element != null ? element.toString() : "null";
}
}
// Provides an iterator across all items in the container.
public Iterator<T> iterator() {
return new UsedNodesIterator<T>(this);
}
// Iterates across used nodes.
private static class UsedNodesIterator<T> implements Iterator<T> {
// Where next to look for the next used node.
Node<T> it;
int limit = 0;
T next = null;
public UsedNodesIterator(Container<T> c) {
// Snapshot the head node at this time.
it = c.head.get();
limit = c.capacity;
}
public boolean hasNext() {
// Made into a `while` loop to fix issue reported by @Nim in code review
while (next == null && limit > 0) {
// Scan to the next non-free node.
while (limit > 0 && it.free.get() == true) {
it = it.next;
// Step down 1.
limit -= 1;
}
if (limit != 0) {
next = it.element;
}
}
return next != null;
}
public T next() {
T n = null;
if ( hasNext () ) {
// Give it to them.
n = next;
next = null;
// Step forward.
it = it.next;
limit -= 1;
} else {
// Not there!!
throw new NoSuchElementException ();
}
return n;
}
public void remove() {
throw new UnsupportedOperationException("Not supported.");
}
}
@Override
public String toString() {
StringBuilder s = new StringBuilder();
Separator comma = new Separator(",");
// Keep counts too.
int usedCount = 0;
int freeCount = 0;
// I will iterate the list myself as I want to count free nodes too.
Node<T> it = head.get();
int count = 0;
s.append("[");
// Scan to the end.
while (count < capacity) {
// Is it in-use?
if (it.free.get() == false) {
// Grab its element.
T e = it.element;
// Is it null?
if (e != null) {
// Good element.
s.append(comma.sep()).append(e.toString());
// Count them.
usedCount += 1;
} else {
// Probably became free while I was traversing.
// Because the element is detached before the entry is marked free.
freeCount += 1;
}
} else {
// Free one.
freeCount += 1;
}
// Next
it = it.next;
count += 1;
}
// Decorate with counts "]used+free".
s.append("]").append(usedCount).append("+").append(freeCount);
if (usedCount + freeCount != capacity) {
// Perhaps something was added/freed while we were iterating.
s.append("?");
}
return s.toString();
}
}
Note that this is close to O1 put and get. A Separator
just emits "" first time around and then its parameter from then on.
Edit: Added test methods.
// ***** Following only needed for testing. *****
private static boolean Debug = false;
private final static String logName = "Container.log";
private final static NamedFileOutput log = new NamedFileOutput("C:\\Junk\\");
private static synchronized void log(boolean toStdoutToo, String s) {
if (Debug) {
if (toStdoutToo) {
System.out.println(s);
}
log(s);
}
}
private static synchronized void log(String s) {
if (Debug) {
try {
log.writeLn(logName, s);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
static volatile boolean testing = true;
// Tester object to exercise the container.
static class Tester<T> implements Runnable {
// My name.
T me;
// The container I am testing.
Container<T> c;
public Tester(Container<T> container, T name) {
c = container;
me = name;
}
private void pause() {
try {
Thread.sleep(0);
} catch (InterruptedException ex) {
testing = false;
}
}
public void run() {
// Spin on add/remove until stopped.
while (testing) {
// Add it.
Node<T> n = c.add(me);
log("Added " + me + ": " + c.toString());
pause();
// Remove it.
c.remove(n, me);
log("Removed " + me + ": " + c.toString());
pause();
}
}
}
static final String[] strings = {
"One", "Two", "Three", "Four", "Five",
"Six", "Seven", "Eight", "Nine", "Ten"
};
static final int TEST_THREADS = Math.min(10, strings.length);
public static void main(String[] args) throws InterruptedException {
Debug = true;
log.delete(logName);
Container<String> c = new Container<String>(10);
// Simple add/remove
log(true, "Simple test");
Node<String> it = c.add(strings[0]);
log("Added " + c.toString());
c.remove(it, strings[0]);
log("Removed " + c.toString());
// Capacity test.
log(true, "Capacity test");
ArrayList<Node<String>> nodes = new ArrayList<Node<String>>(strings.length);
// Fill it.
for (int i = 0; i < strings.length; i++) {
nodes.add(i, c.add(strings[i]));
log("Added " + strings[i] + " " + c.toString());
}
// Add one more.
try {
c.add("Wafer thin mint!");
} catch (IllegalStateException ise) {
log("Full!");
}
c.clear();
log("Empty: " + c.toString());
// Iterate test.
log(true, "Iterator test");
for (int i = 0; i < strings.length; i++) {
nodes.add(i, c.add(strings[i]));
}
StringBuilder all = new StringBuilder ();
Separator sep = new Separator(",");
for (String s : c) {
all.append(sep.sep()).append(s);
}
log("All: "+all);
for (int i = 0; i < strings.length; i++) {
c.remove(nodes.get(i), strings[i]);
}
sep.reset();
all.setLength(0);
for (String s : c) {
all.append(sep.sep()).append(s);
}
log("None: " + all.toString());
// Multiple add/remove
log(true, "Multi test");
for (int i = 0; i < strings.length; i++) {
nodes.add(i, c.add(strings[i]));
log("Added " + strings[i] + " " + c.toString());
}
log("Filled " + c.toString());
for (int i = 0; i < strings.length - 1; i++) {
c.remove(nodes.get(i), strings[i]);
log("Removed " + strings[i] + " " + c.toString());
}
c.remove(nodes.get(strings.length - 1), strings[strings.length - 1]);
log("Empty " + c.toString());
// Multi-threaded add/remove
log(true, "Threads test");
c.clear();
for (int i = 0; i < TEST_THREADS; i++) {
Thread t = new Thread(new Tester<String>(c, strings[i]));
t.setName("Tester " + strings[i]);
log("Starting " + t.getName());
t.start();
}
// Wait for 10 seconds.
long stop = System.currentTimeMillis() + 10 * 1000;
while (System.currentTimeMillis() < stop) {
Thread.sleep(100);
}
// Stop the testers.
testing = false;
// Wait some more.
Thread.sleep(1 * 100);
// Get stats.
double added = c.totalAdded.doubleValue();
double skipped = c.totalSkipped.doubleValue();
//double freed = c.freed.doubleValue();
log(true, "Stats: added=" + c.totalAdded + ",freed=" + c.totalFreed + ",skipped=" + c.totalSkipped + ",O(" + ((added + skipped) / added) + ")");
}
Upvotes: 8
Reputation: 46990
Also have a look at java.util.concurrent
.
Blocking queues will block until there is something to consume or (optionally) space to produce:
http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html
Concurrent linked queue is non-blocking and uses a slick algorithm that allows a producer and consumer to be active concurrently:
http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html
Upvotes: 1
Reputation: 11184
Buffer fifo = BufferUtils.synchronizedBuffer(new CircularFifoBuffer());
Upvotes: 22