user9969
user9969

Reputation: 16040

Producer Consumer Pattern How do you notify completion?

Working on .net 2.0 I need to implement the some threading and I was looking at a dummy examples but cannot find anything which implement eventnotification. Need to know when all is done and also some sort of progress bar if you like.

I am been playing with following code by cannot see to get the event notification correctly. How do I detect that I have finished the processing and possible updating the ui with what I have been doing?

Example code

 class Program
{
    static void Main(string[] args)
    {
        using (PCQueue q = new PCQueue(2))
        {
            q.TaskCompleted += new EventHandler(OnTaskCompleted);
            q.PercentageCompleted += new EventHandler(OnPercentageCompleted);


            for (int i = 1; i < 100; i++)
            {

                string itemNumber = i.ToString(); // To avoid the captured variable trap
                q.EnqueueItem(itemNumber);
            }
          Console.WriteLine("Waiting for items to complete...");
            Console.Read();
        }
    }

    private static void OnPercentageCompleted(object sender, EventArgs e)
    {

    }

    static void OnTaskCompleted(object sender, EventArgs e)
    {

    }
}


public class PCQueue : IDisposable
{
    readonly object locker = new object();
    Thread[] _workers;
    Queue<string> _itemQ = new Queue<string>();
    public PCQueue(int workerCount)
    {
        _workers = new Thread[workerCount];
        // Create and start a separate thread for each worker
        for (int i = 0; i < workerCount; i++)
        {
            (_workers[i] = new Thread(Consume)).Start();
        }
    }

    public void EnqueueItem(string item)
    {
        lock (locker)
        {
            _itemQ.Enqueue(item); // We must pulse because we're
            Monitor.Pulse(locker); // changing a blocking condition.
        }
    }
    void Consume()
    {
        while (true) // Keep consuming until
        { // told otherwise.
            string item;
            lock (locker)
            {
                while (_itemQ.Count == 0) Monitor.Wait(locker);
                item = _itemQ.Dequeue();
            }
            if (item == null) return; // This signals our exit.

            DoSomething(item); // Execute item.
        }
    }
    private void DoSomething(string item)
    {
       Console.WriteLine(item);
    }
    public void Dispose()
    {
        // Enqueue one null item per worker to make each exit.
        foreach (Thread worker in _workers)
        {
            EnqueueItem(null);
        }
    }

    //where/how  can I fire this event???
    public event EventHandler TaskCompleted;
    protected void OnCompleted(EventArgs e)
    {
        if (this.TaskCompleted != null)
        {
            this.TaskCompleted(this, e);
        }
    }
    //where/how can I fire this event???
    public event EventHandler PercentageCompleted;
    protected void OnPercentageCompleted(EventArgs e)
    {
        if (this.PercentageCompleted != null)
        {
            this.PercentageCompleted(this, e);
        }
    }
   }

Any suggestions?

Upvotes: 3

Views: 1168

Answers (2)

ChrisWue
ChrisWue

Reputation: 19020

You can't raise the progress event inside your queue for the simple reason that the queue does not know the total number items which are supposed to be processed. So it can't calculate a percentage. You just stick something in and it gets processed.

What you could do is to raise a ItemProcessed event and subscribe to that. Then in your main program you can do the logic of counting how many items were processed so far in relation to how many are supposed to be processed.

You can raise the complete event just before you are returning from your Consume function. However you need to keep track of how many threads are still active as Brian said in his answer. I modified the code to reflect that.

So something along these lines:

...
private int _ActiveThreads;
public PCQueue(int workerCount)
{
    _ActiveThreads = workerCount;
    _workers = new Thread[workerCount];
    // Create and start a separate thread for each worker
    for (int i = 0; i < workerCount; i++)
    {
        (_workers[i] = new Thread(Consume)).Start();
    }
}

void Consume()
{
    while (true) // Keep consuming until
    { // told otherwise.
        string item;
        lock (locker)
        {
            while (_itemQ.Count == 0) Monitor.Wait(locker);
            item = _itemQ.Dequeue();
        }
        if (item == null)  // This signals our exit.
        {
            if (Interlocked.Decrement(ref _ActiveThreads) == 0)
            {
                OnCompleted(EventArgs.Empty);
            }
            return;
        }

        DoSomething(item); // Execute item.
        OnItemProcessed();
    }
}

public event EventHandler ItemProcessed;
protected void OnItemProcessed()
{
    var handler = ItemProcessed;
    if (handler != null)
    {
        handler(this, EventArgs.Empty);
    }
}
...

Of course you might want to create some meaningfull event args and actually pass the item which was processed to the event.

Then in main:

...
static void Main(string[] args)
{
    using (PCQueue q = new PCQueue(2))
    {
        q.ItemProcessed += ItemProcessed;
        q.TaskCompleted += OnTaskCompleted;

        for (int i = 1; i <= totalNumberOfItems; i++)
        {
            string itemNumber = i.ToString(); // To avoid the captured variable trap
            q.EnqueueItem(itemNumber);
        }
        Console.WriteLine("Waiting for items to complete...");
        Console.Read();
    }
}

private static int currentProcessCount = 0;
private static int totalNumberOfItems = 100;

private static void ItemProcessed(object sender, EventArgs e)
{
     currentProcessCount++;
     Console.WriteLine("Progress: {0}%", ((double)currentProcessCount / (double)totalNumberOfItems) * 100.0);
}

static void OnTaskCompleted(object sender, EventArgs e)
{
     Console.WriteLine("Done");
}
...

Needless to say that all that static stuff should go. This is just based on your example.

One more remark: Your PCQueue currently requires that you enqueue as many null values as you have worker threads otherwise only one thread will quit and the others will wait until your process quits. You can change that by looking at the first item and only removing it when it is not null - thus leaving the marker there for all threads. So Consume would change to this:

void Consume()
{
    while (true) // Keep consuming until
    { // told otherwise.
        string item;
        lock (locker)
        {
            while (_itemQ.Count == 0) Monitor.Wait(locker);
            item = _itemQ.Peek();
            if (item != null) _itemQ.Dequeue();
            else Monitor.PulseAll(); // if the head of the queue is null then make sure all other threads are also woken up so they can quit
        }
        if (item == null)  // This signals our exit.
        {
            if (Interlocked.Decrement(ref _ActiveThreads) == 0)
            {
                OnCompleted(EventArgs.Empty);
            }
            return;
        }

        DoSomething(item); // Execute item.
        OnItemProcessed();
    }
}

Upvotes: 2

Brian Gideon
Brian Gideon

Reputation: 48949

In your PCQueue class you will need to keep track of how many worker threads are still active and raise TaskCompleted only after all threads have been instructed to end.

void Consume()
{
    while (true)
    {
        string item;
        lock (locker)
        {
            while (_itemQ.Count == 0) Monitor.Wait(locker);
            item = _itemQ.Dequeue();
        }

        if (item == null)
        {
            // activeThreads is set to the number of workers in the constructor.
            if (Interlocked.Decrement(ref activeThreads) == 0)
            {
              // Take a snapshot of the event so that a null check + invocation is safe.
              // This works because delegates are immutable.
              var copy = TaskCompleted;

              if (copy != null)
              {
                copy(this, new EventArgs());
              }
            }
            return;
        }

        DoSomething(item); // Execute item.
    }
}

A couple of other points:

  • Kudos for getting the blocking queue implemented correctly. Most people get it wrong.
  • Remember to marshal the TaskCompleted event handler back onto the UI thread before touching any UI controls.
  • You could raise PercentCompleted from DoSomething, but without a clear indication of how many items the queue is suppose to hold the value will not make sense. I second Chris' recommendation on this point.

Upvotes: 2

Related Questions