Reputation: 1568
One of the books I'm reading is "The Art of Multiprocessor Programming, by Maurice Herlihy and Nir Shavit". In it, there is a "wait-free" queue which (after a bit of language adaption) functions test- and logic-perfectly in a threaded environment - At the least, there are no collisions even on 10,000,000 items distributed across 5 threads and the logic checks out.
(I edited the queue to return false if an item could not be gotten, instead of throwing an exception. The code is given below).
However, it has one caveat; the queue cannot grow. A cursory logic checks states that it cannot grow without locking the queue - Which would somewhat negate the point of having a lock-free queue.
The purpose, then, is to create a lock-free (or at least, starvation-free locking) queue which can grow.
So: If we, essentially, give each thread their own shared queue, in a way that is not an oxymoron (and accepting the high probability that this has been solved and better for the purpose of learning-by-doing):
WaitFreeQueue<Queue<int>> queues = new WaitFreeQueue<Queue<int>>(threadCount);
// Dequeue a queue, enqueue an item, enqueue the queue.
// Dequeue a queue, dequeue an item, enqueue the queue.
And the wait-free queue (Previous code included in comments in case I make any breaking changes):
/// <summary>
/// A wait-free queue for use in threaded environments.
/// Significantly adapted from "The Art of Multiprocessor Programming by Maurice Herlihy and Nir Shavit".
/// </summary>
/// <typeparam name="T">The type of item in the queue.</typeparam>
public class WaitFreeQueue<T>
{
/// <summary>
/// The index to dequeue from.
/// </summary>
protected int head;
/// <summary>
/// The index to queue to.
/// </summary>
protected int tail;
/// <summary>
/// The array to queue in.
/// </summary>
protected T[] items;
/// <summary>
/// The number of items queued.
/// </summary>
public int Count
{
get { return tail - head; }
}
/// <summary>
/// Creates a new wait-free queue.
/// </summary>
/// <param name="capacity">The capacity of the queue.</param>
public WaitFreeQueue(int capacity)
{
items = new T[capacity];
head = 0; tail = 0;
}
/// <summary>
/// Attempts to enqueue an item.
/// </summary>
/// <param name="value">The item to enqueue.</param>
/// <returns>Returns false if there was no room in the queue.</returns>
public bool Enqueue(T value)
{
if (tail - head == items.Length)
// throw new IndexOutOfRangeException();
return false;
items[tail % items.Length] = value;
System.Threading.Interlocked.Increment(ref tail);
return true;
// tail++;
}
/// <summary>
/// Attempts to dequeue an item.
/// </summary>
/// <param name="r">The variable to dequeue to.</param>
/// <returns>Returns true if there was an available item to dequeue.</returns>
public bool Dequeue(out T r)
{
if (tail - head == 0)
// throw new InvalidOperationException("No more items.");
{ r = default(T); return false; }
r = items[head % items.Length];
System.Threading.Interlocked.Increment(ref head);
// head++;
// return r;
return true;
}
}
So: Would this work? If not, why? If so, are there still any foreseeable problems?
Thanks.
Upvotes: 2
Views: 153
Reputation: 19020
The code you posted is is not thread safe for neither enqueue nor dequeue.
Enqueue
head = tail = 0
Enqueue
and gets interrupted after items[tail % items.Length] = value;
but before executing the interlocked increment. It has now written a value into item[0]
but not incremented tail
Enqueue
and executes the method. Now tail
is still 0 and therefore the value written by the previous thread is replaced as the second thread will write a value into item[0]
as well. Dequeue
item[0]
and items[1]
head = 0
and tail = 2
Dequeue
and gets interrupted after r = items[head % items.Length];
but before executing the interlocked increments.Dequeue
and will return the same item as head
is still 0Upvotes: 2
Reputation: 244878
Trying to write lock-free multi-threaded code is hard, and you should either leave it to people who know it better than you or me (and use e.g. ConcurrentQueue<T>
), or not write it at all (and use locks), if possible.
That being said, there are several problems with your code:
threadCount
is 2, and you enqueue items 1, 2 and 3, one after another, and then you dequeue, you get item 2!You can't first use the value of a field and then call Interlocked.Increment()
on it like you do. Imagine something like this:
items[tail % items.Length] = value;
items[tail % items.Length] = value;
Interlocked.Increment(ref head);
Interlocked.Increment(ref head);
Now, both threads enqueued to the same position and the position after that didn't change. This is wrong.
Upvotes: 5