Reputation: 39
I need to create a queue for different tasks. Currently this is done with a customized version of the example provided by http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse:
using System;
using System.Threading;
using System.Collections.Generic;
public class PCQueue
{
readonly object _locker = new object();
Thread[] _workers;
Queue<Action> _itemQ = new Queue<Action>();
public PCQueue (int workerCount)
{
_workers = new Thread [workerCount];
// Create and start a separate thread for each worker
for (int i = 0; i < workerCount; i++)
(_workers [i] = new Thread (Consume)).Start();
}
public void Shutdown (bool waitForWorkers)
{
// Enqueue one null item per worker to make each exit.
foreach (Thread worker in _workers)
EnqueueItem (null);
// Wait for workers to finish
if (waitForWorkers)
foreach (Thread worker in _workers)
worker.Join();
}
public void EnqueueItem (Action item)
{
lock (_locker)
{
_itemQ.Enqueue (item); // We must pulse because we're
Monitor.Pulse (_locker); // changing a blocking condition.
}
}
void Consume()
{
while (true) // Keep consuming until
{ // told otherwise.
Action item;
lock (_locker)
{
while (_itemQ.Count == 0) Monitor.Wait (_locker);
item = _itemQ.Dequeue();
}
if (item == null) return; // This signals our exit.
item(); // Execute item.
}
}
}
With the main method:
static void Main()
{
PCQueue q = new PCQueue (2);
Console.WriteLine ("Enqueuing 10 items...");
for (int i = 0; i < 10; i++)
{
int itemNumber = i; // To avoid the captured variable trap
q.EnqueueItem (() =>
{
Thread.Sleep (1000); // Simulate time-consuming work
Console.Write (" Task" + itemNumber);
});
}
q.Shutdown (true);
Console.WriteLine();
Console.WriteLine ("Workers complete!");
}
However while browsing the stackoverflow I stumbled over this modified version:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace Project
{
/// <summary>
/// Description of Multithread.
/// </summary>
public class Multithread<T> : IDisposable where T : class
{
object locker = new object();
Thread[] workers;
Queue<T> taskQ = new Queue<T>();
public void TaskQueue(int workerCount)
{
workers = new Thread[workerCount];
// Create and start a separate thread for each worker
for (int i = 0; i < workerCount; i++)
(workers[i] = new Thread(Consume)).Start();
}
public void Dispose()
{
// Enqueue one null task per worker to make each exit.
foreach (Thread worker in workers) EnqueueTask(null);
foreach (Thread worker in workers) worker.Join();
}
public void EnqueueTask(T task)
{
lock (locker)
{
taskQ.Enqueue(task);
Monitor.PulseAll(locker);
}
}
void Consume()
{
while (true)
{
T task;
lock (locker)
{
while (taskQ.Count == 0) Monitor.Wait(locker);
task = taskQ.Dequeue();
}
if (task == null) return; // This signals our exit
System.Diagnostics.Debug.WriteLine(task);
Thread.Sleep(1000); // Simulate time-consuming task
}
}
}
}
Which seems to provide a better usability. However I can't find out how to properly add a task to this queue.
classname testclass = new classname();
Multithread<classname> testthread = new Multithread<classname>();
I thought it would be something along the lines:
testthread.EnqueueTask(testclass.functioname());
However it doesn't seem to work. I'm stuck on this problem and couldn't find any help to this problem elsewhere.
Upvotes: 1
Views: 2910
Reputation: 48949
You can simplify this a lot by using BlockingCollection
. This data structure is implemented as a queue that already encapsulates the producer-consumer logic.
public class PCQueue
{
private Thread[] workers;
private BlockingCollection<Action> queue = new BlockingCollection<Action>();
private CancellationTokenSource cts = new CancellationTokenSource();
public PCQueue(int workerCount)
{
workers = new Thread[workerCount];
for (int i = 0; i < workerCount; i++)
{
workers[i] = new Thread(Run);
workers[i].Start();
}
}
public void Shutdown(bool waitForWorkers)
{
cts.Cancel();
if (waitForWorkers)
{
foreach (Thread thread in workers)
{
thread.Join();
}
}
}
public void EnqueueItem(Action action)
{
queue.Add(action);
}
private void Consumer()
{
while (true)
{
Action action = queue.Take(cts.Token);
try
{
if (action != null) action();
}
catch (Exception caught)
{
// Notify somebody that something bad happened.
}
}
}
}
Upvotes: 1
Reputation: 106826
I do not see how Multithread
provides better usability as it appears to a demonstration on how to implement the producer/consumer pattern in a generic way by not really deciding how to actually consume items. On the other hand PCQueue
works with actions allowing it to actually consume items.
To modify Multithread
to allow it do some work you can remove the generic type parameter T
and replace all occurrences of T
by Action
. In the Consume
method you then need to replace the code
System.Diagnostics.Debug.WriteLine(task);
Thread.Sleep(1000); // Simulate time-consuming task
by
task();
To enqueue a task you should do it exactly as it is done when using PCQueue
by providing an Action
. You can use a lambda expression for that.
Upvotes: 1