user352891
user352891

Reputation: 1191

Dequeueing objects from a ConcurrentQueue in C#

Hey, I'm trying to implement a ConcurrentQueue in C# for an asynchronous server. Items are being queued as soon as a complete message is received. To dequeue messages, I'm making a small number of threads to do the work of dequeueing and servicing the requests. This is inadequate as each thread is using a while loop, which consumes rather a large amount of processor time, for obvious reasons.

Would anyone know of a method of dequeueing messages when required but not consuming so much processing time.

{
    ...

    for (int i = 0; i < 3; i++)
    {
        Thread t = new Thread(new ThreadStart(startParsingMessages));
        t.Start();
    }

    ...
}

private void startParsingMessages()
{
    QueueContainer dequeued = null;
    Console.WriteLine("Trying");
    while (true)
    {
        if (queue.TryDequeue(out dequeued))
        {
            Console.WriteLine("processing queue");
            ProcessMessage(dequeued.socket, dequeued.message);
        }
    }
}

Upvotes: 12

Views: 33224

Answers (4)

Dwayne
Dwayne

Reputation: 21

The BlockingCollection solution is great. Otherwise, simply sleeping is probably good enough for most cases. It introduces a small delay in response time, and it does consume a small amount of processing time. The advantage is a more robust system that's easier to understand and less likely to fail:

private void startParsingMessages()
{
    QueueContainer dequeued = null;
    Console.WriteLine("Trying");
    while (true)
    {
        //As long as there is data, process it as fast as possible.
        while(queue.TryDequeue(out dequeued))
        {
            Console.WriteLine("processing queue");
            ProcessMessage(dequeued.socket, dequeued.message);
        }
        //No more data, wait 100 ms before checking again
        Thread.Sleep(100);
    }
}

Upvotes: 2

Jon Skeet
Jon Skeet

Reputation: 1500675

Instead of using ConcurrentQueue<T> directly, have you tried wrapping it in a BlockingCollection<T>? Then you can use TryTake(out T, TimeSpan) etc. I believe that's the expected use: that the concurrent collections themselves would usually be there just so you can select how the blocking collection will work.

That doesn't have to be the only use for these collections of course, but particularly for ConcurrentQueue<T>, the producer/consumer queue scenario is the most common one - at which point BlockingCollection<T> is the way to make it easy to do the right thing.

Upvotes: 25

Joshua
Joshua

Reputation: 8212

You could use a static locking object for the threads to wait on and then pulse it when something is ready to be processed.

static readonly object queueLock = new object();

// Threads that enqueue an object
void QueueMessage() {
  lock (queueLock) {
    queue.Enqueue(obj);
    Monitor.Pulse(queueLock);
  }
}
// Thread dequeuer
private void startParsingMessages() {
  QueueContainer dequeued = null;
  Console.WriteLine("Trying");
  while (true) {
    lock(queueLock) {
      if (!queue.TryDequeue(out dequeued)) {
        Console.WriteLine("No object to dequeue, waiting...");
        // Threads will wait here and only one will be released when .Pulse()d
        Monitor.Wait(queueLock);
        dequeued = queue.Dequeue();
      }
      if (dequeued != null) {
        Console.WriteLine("processing queue");
        ProcessMessage(dequeued.socket, dequeued.message);
      }
    }
  }
}

Keep in mind that this can get quite complicated with the more branches you have, but the gist of it is, you lock on a common object and call Monitor.Wait to wait on an object to be Pulsed. When this happens only one thread that was waiting on it will be released. If you enqueue a lot of objects and want them all to go, you can call Monitor.PulseAll which will release all threads that were waiting on the object.

Upvotes: 3

btlog
btlog

Reputation: 4780

There is a pretty well know pattern in concurrent programming called Parallel Producer/Consumer pattern. This article might help resolve some of your issues. http://blogs.msdn.com/b/csharpfaq/archive/2010/08/12/blocking-collection-and-the-producer-consumer-problem.aspx

Upvotes: 6

Related Questions