Reputation: 34820
We have implemented a message queue by using C# Queue
. We know we only have ONE consumer to take available message out from the queue for processing with a while
loop. We also know there is only ONE producer to put message onto the queue.
We have a lock
on above message queue to make sure the consumer and producer cannot access the queue simultaneously.
My question is is that lock
necessary? If the Queue
increase its Count
property AFTER an item is actually added and if the consumer check the Count
before retrieve, the consumer should get a complete message item even we don't have that lock
. Right? So we will not face a partial message item issue. Then we can get rid of that lock
?
That lock
will slow down the system and occasionally we can see the retrieve thread is blocked for a while because we have a very heavy producer.
EDIT:
Unfortunately we are using .Net 3.5.
Upvotes: 5
Views: 1033
Reputation: 64248
BTW, Lockless queues for a single reader and a single writer are fairly easy to write. This is a very primitive example of the concept, but it does the job:
class LocklessQueue<T>
{
class Item
{
public Item Next;
bool _valid;
T _value;
public Item(bool valid, T value)
{
_valid = valid;
_value = value;
Next = null;
}
public bool IsValid { get { return _valid; } }
public T TakeValue()
{
T value = _value;
_valid = false;
_value = default(T);
return value;
}
}
Item _first;
Item _last;
public LocklessQueue()
{
_first = _last = new Item(false, default(T));
}
public bool IsEmpty
{
get
{
while (!_first.IsValid && _first.Next != null)
_first = _first.Next;
return false == _first.IsValid;
}
}
public void Enqueue(T value)
{
Item i = new Item(true, value);
_last.Next = i;
_last = i;
}
public T Dequeue()
{
while (!_first.IsValid && _first.Next != null)
_first = _first.Next;
if (IsEmpty)
throw new InvalidOperationException();//queue is empty
return _first.TakeValue();
}
}
Upvotes: 1
Reputation: 64248
No, this won't work consistently... why?
Let's disassemble the two methods we are going to be calling concurrently from two threads (one read & one writer):
public T Dequeue()
{
if (this._size == 0)
{
ThrowHelper.ThrowInvalidOperationException(ExceptionResource.InvalidOperation_EmptyQueue);
}
T local = this._array[this._head];
this._array[this._head] = default(T);
this._head = (this._head + 1) % this._array.Length;
this._size--;
this._version++;
return local;
}
public void Enqueue(T item)
{
if (this._size == this._array.Length)
{
int capacity = (int) ((this._array.Length * 200L) / 100L);
if (capacity < (this._array.Length + 4))
{
capacity = this._array.Length + 4;
}
this.SetCapacity(capacity);
}
this._array[this._tail] = item;
this._tail = (this._tail + 1) % this._array.Length;
this._size++;
this._version++;
}
Given the above code three of the variables are safe if (and only if) there is adequate capacity in the queue. the fields for _array, _head, and _tail are either unmodified or modified in only one of the two methods above.
The reason you cannot remove the lock() is that both methods modify _size and _version. Though arguably the collision on _version could be ignored, the collision on _size would cause some undesired and unpredictable behavior.
Upvotes: 1
Reputation: 134095
ConcurrentQueue
is available even if you're using .NET 3.5. The Reactive Extensions includes what used to be the Parallel Extensions to .NET 3.5--the precursor to the Task Parallel Library that is included in .NET 4.0.
Upvotes: 0
Reputation: 7778
You should be Locking; the class isn't threadsafe. If you're using the Queue
in System.Collections
, there is a thread-safe Queue
handy (System.Collections.Queue.Synchronized()
returns such a Queue
). Otherwise, be sure to use the object provided Queue<T>.SyncRoot
to synchronize:
using System.Collections.Generic;
public static class Q_Example
{
private readonly Queue<int> q = new Queue<int>();
public void Method1(int val)
{
lock(q.SyncRoot)
{
q.EnQueue(val);
}
}
public int Method2()
{
lock(q.SyncRoot)
{
return q.Dequeue();
}
}
}
Upvotes: 0
Reputation: 564811
The lock is necessary, if you're using Queue<T>
. You could easily remove this by replacing that with ConcurrentQueue<T>
, however, you may want to consider simplifying this code by replacing it with a BlockingCollection<T>
.
This would allow your consumer to eliminate the lock and while checking, and just do a single foreach on collection.GetConsumingEnumerable()
. The producer can eliminate the lock and add items as needed. It also would easily allow you to scale up to using multiple producers, since you mentioned you have a "very heavy producer" at the moment.
Upvotes: 4
Reputation: 38484
The real problem is that internal data structures of the queue may be being mutated whilst an enqueue or dequeue is happening, and during this period the data structure is in an indeterminate state from the point-of-view of another thread.
For example, an enqueue may require internal data structures to be expanded, where a new structure is created, and old items are copied form an old structure to a new structure. There will be many steps involved in this process where at any time it would be dangerous for another thread to access the queue as the operations are not complete.
Therefore, during enqueue\dequeue you have to lock to make these operation appear to be logically atomic.
You may try the new ConcurrentQueue class in .Net 4.0 as this possibly has better performance characteristics as it uses a non-locking algorithm.
Upvotes: 10