5YrsLaterDBA
5YrsLaterDBA

Reputation: 34820

message queue thinking

We have implemented a message queue by using C# Queue. We know we only have ONE consumer to take available message out from the queue for processing with a while loop. We also know there is only ONE producer to put message onto the queue.

We have a lock on above message queue to make sure the consumer and producer cannot access the queue simultaneously.

My question is is that lock necessary? If the Queue increase its Count property AFTER an item is actually added and if the consumer check the Count before retrieve, the consumer should get a complete message item even we don't have that lock. Right? So we will not face a partial message item issue. Then we can get rid of that lock?

That lock will slow down the system and occasionally we can see the retrieve thread is blocked for a while because we have a very heavy producer.

EDIT:

Unfortunately we are using .Net 3.5.

Upvotes: 5

Views: 1033

Answers (7)

csharptest.net
csharptest.net

Reputation: 64248

BTW, Lockless queues for a single reader and a single writer are fairly easy to write. This is a very primitive example of the concept, but it does the job:

class LocklessQueue<T>
{
    class Item
    {
        public Item Next;
        bool _valid;
        T _value;
        public Item(bool valid, T value)
        {
            _valid = valid;
            _value = value;
            Next = null;
        }
        public bool IsValid { get { return _valid; } }
        public T TakeValue()
        {
            T value = _value;
            _valid = false;
            _value = default(T);
            return value;
        }
    }

    Item _first;
    Item _last;

    public LocklessQueue()
    {
        _first = _last = new Item(false, default(T));
    }

    public bool IsEmpty
    { 
        get
        {
            while (!_first.IsValid && _first.Next != null)
                _first = _first.Next;
            return false == _first.IsValid;
        }
    }

    public void Enqueue(T value)
    {
        Item i = new Item(true, value);
        _last.Next = i;
        _last = i;
    }

    public T Dequeue()
    {
        while (!_first.IsValid && _first.Next != null)
            _first = _first.Next;

        if (IsEmpty)
            throw new InvalidOperationException();//queue is empty

        return _first.TakeValue();
    }
}

Upvotes: 1

csharptest.net
csharptest.net

Reputation: 64248

No, this won't work consistently... why?

Let's disassemble the two methods we are going to be calling concurrently from two threads (one read & one writer):

public T Dequeue()
{
    if (this._size == 0)
    {
        ThrowHelper.ThrowInvalidOperationException(ExceptionResource.InvalidOperation_EmptyQueue);
    }
    T local = this._array[this._head];
    this._array[this._head] = default(T);
    this._head = (this._head + 1) % this._array.Length;
    this._size--;
    this._version++;
    return local;
}

public void Enqueue(T item)
{
    if (this._size == this._array.Length)
    {
        int capacity = (int) ((this._array.Length * 200L) / 100L);
        if (capacity < (this._array.Length + 4))
        {
            capacity = this._array.Length + 4;
        }
        this.SetCapacity(capacity);
    }
    this._array[this._tail] = item;
    this._tail = (this._tail + 1) % this._array.Length;
    this._size++;
    this._version++;
}

Given the above code three of the variables are safe if (and only if) there is adequate capacity in the queue. the fields for _array, _head, and _tail are either unmodified or modified in only one of the two methods above.

The reason you cannot remove the lock() is that both methods modify _size and _version. Though arguably the collision on _version could be ignored, the collision on _size would cause some undesired and unpredictable behavior.

Upvotes: 1

Jim Mischel
Jim Mischel

Reputation: 134095

ConcurrentQueue is available even if you're using .NET 3.5. The Reactive Extensions includes what used to be the Parallel Extensions to .NET 3.5--the precursor to the Task Parallel Library that is included in .NET 4.0.

Upvotes: 0

lmat - Reinstate Monica
lmat - Reinstate Monica

Reputation: 7778

You should be Locking; the class isn't threadsafe. If you're using the Queue in System.Collections, there is a thread-safe Queue handy (System.Collections.Queue.Synchronized() returns such a Queue). Otherwise, be sure to use the object provided Queue<T>.SyncRoot to synchronize:

using System.Collections.Generic;
public static class Q_Example
{
    private readonly Queue<int> q = new Queue<int>();
    public void Method1(int val)
    {
        lock(q.SyncRoot)
        {
            q.EnQueue(val);
        }
    }

    public int Method2()
    {
        lock(q.SyncRoot)
        {
            return q.Dequeue();
        }
    }
}

Upvotes: 0

Reed Copsey
Reed Copsey

Reputation: 564811

The lock is necessary, if you're using Queue<T>. You could easily remove this by replacing that with ConcurrentQueue<T>, however, you may want to consider simplifying this code by replacing it with a BlockingCollection<T>.

This would allow your consumer to eliminate the lock and while checking, and just do a single foreach on collection.GetConsumingEnumerable(). The producer can eliminate the lock and add items as needed. It also would easily allow you to scale up to using multiple producers, since you mentioned you have a "very heavy producer" at the moment.

Upvotes: 4

Matěj Z&#225;bsk&#253;
Matěj Z&#225;bsk&#253;

Reputation: 17272

See ConcurrentQueue.

Upvotes: 2

Tim Lloyd
Tim Lloyd

Reputation: 38484

The real problem is that internal data structures of the queue may be being mutated whilst an enqueue or dequeue is happening, and during this period the data structure is in an indeterminate state from the point-of-view of another thread.

For example, an enqueue may require internal data structures to be expanded, where a new structure is created, and old items are copied form an old structure to a new structure. There will be many steps involved in this process where at any time it would be dangerous for another thread to access the queue as the operations are not complete.

Therefore, during enqueue\dequeue you have to lock to make these operation appear to be logically atomic.

You may try the new ConcurrentQueue class in .Net 4.0 as this possibly has better performance characteristics as it uses a non-locking algorithm.

Upvotes: 10

Related Questions