Benni
Benni

Reputation: 1060

C#: Locking a Queue properly for Iteration

I am using a Queue (C#) to store data that has to be sent to any client connecting.

my lock statement is private readonly:

private readonly object completedATEQueueSynched = new object();

only two methods are enqueueing:

1) started by mouse-movement, executed by the mainform-thread:

public void handleEddingToolMouseMove(MouseEventArgs e)
{
    AbstractTrafficElement de = new...
    sendElementToAllPlayers(de)
    lock (completedATEQueueSynched)
    {
       completedATEQueue.Enqueue(de);
    }
}

2) started on a button-event, executed by mainform-thread too (does not matter here, but better safe than sorry):

public void handleBLC(EventArgs e)
{
    AbstractTrafficElement de = new...
    sendElementToAllPlayers(de);
    lock (completedATEQueueSynched)
    {
         completedATEQueue.Enqueue(de);
    }
}

this method is called by the thread responsible for the specific client connected. here it is:

private void sendSetData(TcpClient c)
{
    NetworkStream clientStream = c.GetStream();
    lock (completedATEQueueSynched)
    {
        foreach (AbstractTrafficElement ate in MainForm.completedATEQueue)
        {
            binaryF.Serialize(clientStream, ate);
        }
    }
}

if a client connects and i am moving my mouse at the same time, a deadlock occurs. if i lock the iteration only, a InvalidOperation exection is thrown, because the queue changed.

i have tried the synchronized Queue-Wrapper as well, but it does't work for Iterating. (even in combination with locks) any ideas? i just don't get my mistake

Upvotes: 2

Views: 733

Answers (3)

GSerjo
GSerjo

Reputation: 4778

Looks like ConcurrentQueue you've wanted

UPDATE

Yes work fine, TryDequeue uses within the Interlocked.CompareExchange and SpinWait. Lock is not good choice, because too expensive take a look on SpinLock and don't forget about Data Structures for Parallel Programming

Her is enqueue from ConcurrentQueue, as you see only SpinWait and Interlocked.Increment are used. looks pretty nice

public void Enqueue(T item)
{
  SpinWait spinWait = new SpinWait();
  while (!this.m_tail.TryAppend(item, ref this.m_tail))
    spinWait.SpinOnce();
}

  internal void Grow(ref ConcurrentQueue<T>.Segment tail)
  {
    this.m_next = new ConcurrentQueue<T>.Segment(this.m_index + 1L);
    tail = this.m_next;
  }

  internal bool TryAppend(T value, ref ConcurrentQueue<T>.Segment tail)
  {
    if (this.m_high >= 31)
      return false;
    int index = 32;
    try
    {
    }
    finally
    {
      index = Interlocked.Increment(ref this.m_high);
      if (index <= 31)
      {
        this.m_array[index] = value;
        this.m_state[index] = 1;
      }
      if (index == 31)
        this.Grow(ref tail);
    }
    return index <= 31;
  }

Upvotes: 2

Ankush
Ankush

Reputation: 2554

Henk Holterman's approach is good if your rate of en-queue, dequeue on queue is not very high. Here I think you are capturing mouse movements. If you expect to generate lot of data in queue the above approach is not fine. The lock becomes contention between the network code and en-queue code. The granularity of this lock is at whole queue level.

In this case I'll recommend what GSerjo mentioned - ConcurrentQueue. I've looked into the implementation of this queue. It is very granular. It operates at single element level in queue. While one thread is dequeueing, other threads can in parallel enqueue without stopping.

Upvotes: 1

Henk Holterman
Henk Holterman

Reputation: 273264

You can reduce the contention, probably enough to make it acceptable:

private void sendSetData(TcpClient c)
{
    IEnumerable<AbstractTrafficElement> list;

    lock (completedATEQueueSynched)
    {
        list = MainForm.completedATEQueue.ToList();  // take a snapshot
    }

    NetworkStream clientStream = c.GetStream();
    foreach (AbstractTrafficElement ate in list)
    {
       binaryF.Serialize(clientStream, ate);
    }    
}

But of course a snapshot introduces its own bit of timing logic. What exactly does 'all elements' mean at any given moment?

Upvotes: 3

Related Questions