Roman Vottner
Roman Vottner

Reputation: 12839

Lock-free container with flipping buffers

For one of my projects, which has to support concurrent reads and writes, I need a container that is able to buffer items till a consumer has taken each currently buffered item at once. As producers should be able to produce data regardless of whether the consumer read the current buffer I came up with a custom implementation that, with the help of AtomicReference, adds every entry to a backing ConcurrentLinkedQueue until a flip is performed which causes the current entry to get returned while storing a new entry with empty queue and metadata to be stored in that AtomicReference atomically.

I came up with a solution such as

public class FlippingDataContainer<E> {

  private final AtomicReference<FlippingDataContainerEntry<E>> dataObj = new AtomicReference<>();

  public FlippingDataContainer() {
    dataObj.set(new FlippingDataContainerEntry<>(new ConcurrentLinkedQueue<>(), 0, 0, 0));
  }

  public FlippingDataContainerEntry<E> put(E value) {
    if (null != value) {
      while (true) {
        FlippingDataContainerEntry<E> data = dataObj.get();
        FlippingDataContainerEntry<E> updated = FlippingDataContainerEntry.from(data, value);
        if (dataObj.compareAndSet(data, updated)) {
          return merged;
        }
      }
    }
    return null;
  }

  public FlippingDataContainerEntry<E> flip() {
    FlippingDataContainerEntry<E> oldData;
    FlippingDataContainerEntry<E> newData = new FlippingDataContainerEntry<>(new ConcurrentLinkedQueue<>(), 0, 0, 0);
    while (true) {
      oldData = dataObj.get();
      if (dataObj.compareAndSet(oldData, newData)) {
        return oldData;
      }
    }
  }

  public boolean isEmptry() {
    return dataObj.get().getQueue().isEmpty();
  }
}

As the current value needs to be pushed to the backing queue certain care needs to be taken now. The current implementation of the from(data, value) method does look something like this:

static <E> FlippingDataContainerEntry<E> from(FlippingDataContainerEntry<E> data, E value) {
  Queue<E> queue = new ConcurrentLinkedQueue<>(data.getQueue());
  queue.add(value);
  return new FlippingDataContainerEntry<>(queue,
      data.getKeyLength() + (value.getKeyAsBytes() != null ? value.getKeyAsBytes().length : 0),
      data.getValueLength() + (value.getValueAsBytes() != null ? value.getValueAsBytes().length : 0),
      data.getAuxiliaryLength() + (value.getAuxiliaryAsBytes() != null ? value.getAuxiliaryAsBytes().length : 0));
}

Due to possible retries caused by an other thread updated the value right before this thread could perform the update, I need to copy the actual queue on each write attempt as otherwise an entry would be added to the shared queue even though the atomic reference couldn't get updated. Simply adding the value to the shared queue could therefore lead to the value entry being added to the queue multiple times when it actually only should occur once.

Copying the whole queue is quite an expensive task so I tinkered around with just setting the current queue instead of copying the queue within the from(data, value) method and instead of adding the value element to the shared queue in the block executed when the update occurred:

public FlippingDataContainerEntry<E> put(E value) {
  if (null != value) {
    while (true) {
      FlippingDataContainerEntry<E> data = dataObj.get();
      FlippingDataContainerEntry<E> updated = FlippingDataContainerEntry.from(data, value);
      if (data.compareAndSet(data, updated)) {
        updated.getQueue().add(value);
        return updated;
      }
    }
  }
  return null;
}

Within the from(data, value) I now only set the queue without adding the value element directly

static <E> FlippingDataContainerEntry<E> from(FlippingDataContainerEntry<E> data, E value) {
  return new FlippingDataContainerEntry<>(data.getQueue(),
      data.getKeyLength() + (value.getKeyAsBytes() != null ? value.getKeyAsBytes().length : 0),
      data.getValueLength() + (value.getValueAsBytes() != null ? value.getValueAsBytes().length : 0),
      data.getAuxiliaryLength() + (value.getAuxiliaryAsBytes() != null ? value.getAuxiliaryAsBytes().length : 0));
}

While this allows to run the test 10+ times faster compared to the code copying the queue, it also fails the consumption test quite often as now the addition of the value element to the queue could occur right after the consumer thread flipped the queue and processed the data and therefore not all of the items appeared to be consumed.

The actual question now is, can the copying of the backing queue be avoided to gain a performance boost while still allowing to update the queue's content atomically with lock-free algorithms and therefore also avoid losing some entries mid way?

Upvotes: 1

Views: 339

Answers (2)

Peter Cordes
Peter Cordes

Reputation: 364532

I need to copy the actual queue on each write attempt

Your idea sounds like RCU (https://en.wikipedia.org/wiki/Read-copy-update). Java being garbage-collected makes RCU a lot easier by solving the deallocation problem for you (I think).

If I understand correctly from a quick skim of your question, your "readers" actually want to "claim" the whole current contents of the container for themselves. That makes them effectively writers, too, but instead of read+copy they can just construct an empty container, and atomically exchange the top-level reference to point to that. (Thus claiming the old container for exclusive access.)

A big benefit to RCU is that the container data structure itself doesn't have to be Atomic all over the place; once you have a reference to it, nobody else is modifying it.


The only tricky part comes when a writer wants to add new stuff to a non-empty container. Then you do copy the existing container and modify the copy, and attempt to CAS (compare-exchange, i.e. compareAndSet()) the updated copy into the shared top-level AtomicReference.

A writer can't just unconditionally exchange, because it might end up with a non-empty container and have nowhere to put it. Unless it's ok for a writer to hang on to a batch of work and spin waiting for a reader to empty the queue...


I'm assuming here that your writers have batches of work to enqueue at once; otherwise RCU is probably too expensive for the writers. Sorry if I missed a detail in your question that rules that out. I don't regularly use Java, so I'm just writing this quickly in case it's helpful.

Upvotes: 0

Malt
Malt

Reputation: 30305

First, let's state the obvious - the best solution is to avoid writing any such custom classes. Perhaps something as simple as a java.util.concurrent.LinkedTransferQueue would work just as well, and be less error-prone. And if a LinkedTransferQueue doesn't work, then what about the LMAX disruptor or something similar? Have you looked at existing solutions?

And if you still need/want a custom solution, then I have a sketch of a slightly different approach, one that would avoid the copying:

The idea is for the put operations to spin around some atomic variable, trying to set it. If a thread manages to set it, then it gets exclusive access to the current queue, which means that it can append to it. After appending, it resets the atomic variable to allow other threads to append. It's basically a spin-lock. That way, the contention between the threads occurs before appending to the queue, rather than after.

Upvotes: 1

Related Questions