Test Account
Test Account

Reputation: 39

How to enqueue a task in a dynamic queue

I need to create a queue for different tasks. Currently this is done with a customized version of the example provided by http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse:

using System;
using System.Threading;
using System.Collections.Generic;

public class PCQueue
{
  readonly object _locker = new object();
  Thread[] _workers;
  Queue<Action> _itemQ = new Queue<Action>();

  public PCQueue (int workerCount)
  {
    _workers = new Thread [workerCount];

    // Create and start a separate thread for each worker
    for (int i = 0; i < workerCount; i++)
      (_workers [i] = new Thread (Consume)).Start();
  }

  public void Shutdown (bool waitForWorkers)
  {
    // Enqueue one null item per worker to make each exit.
    foreach (Thread worker in _workers)
      EnqueueItem (null);

    // Wait for workers to finish
    if (waitForWorkers)
      foreach (Thread worker in _workers)
        worker.Join();
  }

  public void EnqueueItem (Action item)
  {
    lock (_locker)
    {
      _itemQ.Enqueue (item);           // We must pulse because we're
      Monitor.Pulse (_locker);         // changing a blocking condition.
    }
  }

  void Consume()
  {
    while (true)                        // Keep consuming until
    {                                   // told otherwise.
      Action item;
      lock (_locker)
      {
        while (_itemQ.Count == 0) Monitor.Wait (_locker);
        item = _itemQ.Dequeue();
      }
      if (item == null) return;         // This signals our exit.
      item();                           // Execute item.
    }
  }
}

With the main method:

static void Main()
{
  PCQueue q = new PCQueue (2);

  Console.WriteLine ("Enqueuing 10 items...");

  for (int i = 0; i < 10; i++)
  {
    int itemNumber = i;      // To avoid the captured variable trap
    q.EnqueueItem (() =>
    {
      Thread.Sleep (1000);          // Simulate time-consuming work
      Console.Write (" Task" + itemNumber);
    });
  }

  q.Shutdown (true);
  Console.WriteLine();
  Console.WriteLine ("Workers complete!");
}

However while browsing the stackoverflow I stumbled over this modified version:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace Project
{
    /// <summary>
    /// Description of Multithread.
    /// </summary>
     public class Multithread<T> : IDisposable where T : class
    {
        object locker = new object();
        Thread[] workers;
        Queue<T> taskQ = new Queue<T>();

        public void TaskQueue(int workerCount)
        {
            workers = new Thread[workerCount];

            // Create and start a separate thread for each worker
            for (int i = 0; i < workerCount; i++)
                (workers[i] = new Thread(Consume)).Start();
        }

        public void Dispose()
        {
            // Enqueue one null task per worker to make each exit.
            foreach (Thread worker in workers) EnqueueTask(null);
            foreach (Thread worker in workers) worker.Join();
        }

        public void EnqueueTask(T task)
        {
            lock (locker)
            {
                taskQ.Enqueue(task);
                Monitor.PulseAll(locker);
            }
        }

        void Consume()
        {
            while (true)
            {
                T task;
                lock (locker)
                {
                    while (taskQ.Count == 0) Monitor.Wait(locker);
                    task = taskQ.Dequeue();
                }
                if (task == null) return;         // This signals our exit
                System.Diagnostics.Debug.WriteLine(task);
                Thread.Sleep(1000);              // Simulate time-consuming task
            }
        }
    }
}

Which seems to provide a better usability. However I can't find out how to properly add a task to this queue.

classname testclass = new classname();
Multithread<classname> testthread = new Multithread<classname>();

I thought it would be something along the lines:

testthread.EnqueueTask(testclass.functioname());

However it doesn't seem to work. I'm stuck on this problem and couldn't find any help to this problem elsewhere.

Upvotes: 1

Views: 2910

Answers (2)

Brian Gideon
Brian Gideon

Reputation: 48949

You can simplify this a lot by using BlockingCollection. This data structure is implemented as a queue that already encapsulates the producer-consumer logic.

public class PCQueue
{
  private Thread[] workers;
  private BlockingCollection<Action> queue = new BlockingCollection<Action>();
  private CancellationTokenSource cts = new CancellationTokenSource();

  public PCQueue(int workerCount)
  {
    workers = new Thread[workerCount];
    for (int i = 0; i < workerCount; i++)
    {
      workers[i] = new Thread(Run);
      workers[i].Start();
    }
  }

  public void Shutdown(bool waitForWorkers)
  {
    cts.Cancel();
    if (waitForWorkers)
    {
      foreach (Thread thread in workers)
      {
        thread.Join();
      }
    }
  }

  public void EnqueueItem(Action action)
  {
    queue.Add(action);
  }

  private void Consumer()
  {
    while (true)
    {
      Action action = queue.Take(cts.Token);
      try
      {
        if (action != null) action();
      }
      catch (Exception caught)
      {
        // Notify somebody that something bad happened.
      }
    }
  }
}

Upvotes: 1

Martin Liversage
Martin Liversage

Reputation: 106826

I do not see how Multithread provides better usability as it appears to a demonstration on how to implement the producer/consumer pattern in a generic way by not really deciding how to actually consume items. On the other hand PCQueue works with actions allowing it to actually consume items.

To modify Multithread to allow it do some work you can remove the generic type parameter T and replace all occurrences of T by Action. In the Consume method you then need to replace the code

System.Diagnostics.Debug.WriteLine(task);
Thread.Sleep(1000);              // Simulate time-consuming task

by

task();

To enqueue a task you should do it exactly as it is done when using PCQueue by providing an Action. You can use a lambda expression for that.

Upvotes: 1

Related Questions