Reputation: 16040
Working on .net 2.0 I need to implement the some threading and I was looking at a dummy examples but cannot find anything which implement eventnotification. Need to know when all is done and also some sort of progress bar if you like.
I am been playing with following code by cannot see to get the event notification correctly. How do I detect that I have finished the processing and possible updating the ui with what I have been doing?
Example code
class Program
{
static void Main(string[] args)
{
using (PCQueue q = new PCQueue(2))
{
q.TaskCompleted += new EventHandler(OnTaskCompleted);
q.PercentageCompleted += new EventHandler(OnPercentageCompleted);
for (int i = 1; i < 100; i++)
{
string itemNumber = i.ToString(); // To avoid the captured variable trap
q.EnqueueItem(itemNumber);
}
Console.WriteLine("Waiting for items to complete...");
Console.Read();
}
}
private static void OnPercentageCompleted(object sender, EventArgs e)
{
}
static void OnTaskCompleted(object sender, EventArgs e)
{
}
}
public class PCQueue : IDisposable
{
readonly object locker = new object();
Thread[] _workers;
Queue<string> _itemQ = new Queue<string>();
public PCQueue(int workerCount)
{
_workers = new Thread[workerCount];
// Create and start a separate thread for each worker
for (int i = 0; i < workerCount; i++)
{
(_workers[i] = new Thread(Consume)).Start();
}
}
public void EnqueueItem(string item)
{
lock (locker)
{
_itemQ.Enqueue(item); // We must pulse because we're
Monitor.Pulse(locker); // changing a blocking condition.
}
}
void Consume()
{
while (true) // Keep consuming until
{ // told otherwise.
string item;
lock (locker)
{
while (_itemQ.Count == 0) Monitor.Wait(locker);
item = _itemQ.Dequeue();
}
if (item == null) return; // This signals our exit.
DoSomething(item); // Execute item.
}
}
private void DoSomething(string item)
{
Console.WriteLine(item);
}
public void Dispose()
{
// Enqueue one null item per worker to make each exit.
foreach (Thread worker in _workers)
{
EnqueueItem(null);
}
}
//where/how can I fire this event???
public event EventHandler TaskCompleted;
protected void OnCompleted(EventArgs e)
{
if (this.TaskCompleted != null)
{
this.TaskCompleted(this, e);
}
}
//where/how can I fire this event???
public event EventHandler PercentageCompleted;
protected void OnPercentageCompleted(EventArgs e)
{
if (this.PercentageCompleted != null)
{
this.PercentageCompleted(this, e);
}
}
}
Any suggestions?
Upvotes: 3
Views: 1168
Reputation: 19020
You can't raise the progress event inside your queue for the simple reason that the queue does not know the total number items which are supposed to be processed. So it can't calculate a percentage. You just stick something in and it gets processed.
What you could do is to raise a ItemProcessed event and subscribe to that. Then in your main program you can do the logic of counting how many items were processed so far in relation to how many are supposed to be processed.
You can raise the complete event just before you are returning from your Consume
function. However you need to keep track of how many threads are still active as Brian said in his answer. I modified the code to reflect that.
So something along these lines:
...
private int _ActiveThreads;
public PCQueue(int workerCount)
{
_ActiveThreads = workerCount;
_workers = new Thread[workerCount];
// Create and start a separate thread for each worker
for (int i = 0; i < workerCount; i++)
{
(_workers[i] = new Thread(Consume)).Start();
}
}
void Consume()
{
while (true) // Keep consuming until
{ // told otherwise.
string item;
lock (locker)
{
while (_itemQ.Count == 0) Monitor.Wait(locker);
item = _itemQ.Dequeue();
}
if (item == null) // This signals our exit.
{
if (Interlocked.Decrement(ref _ActiveThreads) == 0)
{
OnCompleted(EventArgs.Empty);
}
return;
}
DoSomething(item); // Execute item.
OnItemProcessed();
}
}
public event EventHandler ItemProcessed;
protected void OnItemProcessed()
{
var handler = ItemProcessed;
if (handler != null)
{
handler(this, EventArgs.Empty);
}
}
...
Of course you might want to create some meaningfull event args and actually pass the item which was processed to the event.
Then in main:
...
static void Main(string[] args)
{
using (PCQueue q = new PCQueue(2))
{
q.ItemProcessed += ItemProcessed;
q.TaskCompleted += OnTaskCompleted;
for (int i = 1; i <= totalNumberOfItems; i++)
{
string itemNumber = i.ToString(); // To avoid the captured variable trap
q.EnqueueItem(itemNumber);
}
Console.WriteLine("Waiting for items to complete...");
Console.Read();
}
}
private static int currentProcessCount = 0;
private static int totalNumberOfItems = 100;
private static void ItemProcessed(object sender, EventArgs e)
{
currentProcessCount++;
Console.WriteLine("Progress: {0}%", ((double)currentProcessCount / (double)totalNumberOfItems) * 100.0);
}
static void OnTaskCompleted(object sender, EventArgs e)
{
Console.WriteLine("Done");
}
...
Needless to say that all that static stuff should go. This is just based on your example.
One more remark:
Your PCQueue
currently requires that you enqueue as many null
values as you have worker threads otherwise only one thread will quit and the others will wait until your process quits. You can change that by looking at the first item and only removing it when it is not null
- thus leaving the marker there for all threads. So Consume
would change to this:
void Consume()
{
while (true) // Keep consuming until
{ // told otherwise.
string item;
lock (locker)
{
while (_itemQ.Count == 0) Monitor.Wait(locker);
item = _itemQ.Peek();
if (item != null) _itemQ.Dequeue();
else Monitor.PulseAll(); // if the head of the queue is null then make sure all other threads are also woken up so they can quit
}
if (item == null) // This signals our exit.
{
if (Interlocked.Decrement(ref _ActiveThreads) == 0)
{
OnCompleted(EventArgs.Empty);
}
return;
}
DoSomething(item); // Execute item.
OnItemProcessed();
}
}
Upvotes: 2
Reputation: 48949
In your PCQueue
class you will need to keep track of how many worker threads are still active and raise TaskCompleted
only after all threads have been instructed to end.
void Consume()
{
while (true)
{
string item;
lock (locker)
{
while (_itemQ.Count == 0) Monitor.Wait(locker);
item = _itemQ.Dequeue();
}
if (item == null)
{
// activeThreads is set to the number of workers in the constructor.
if (Interlocked.Decrement(ref activeThreads) == 0)
{
// Take a snapshot of the event so that a null check + invocation is safe.
// This works because delegates are immutable.
var copy = TaskCompleted;
if (copy != null)
{
copy(this, new EventArgs());
}
}
return;
}
DoSomething(item); // Execute item.
}
}
A couple of other points:
TaskCompleted
event handler back onto the UI thread before touching any UI controls.PercentCompleted
from DoSomething
, but without a clear indication of how many items the queue is suppose to hold the value will not make sense. I second Chris' recommendation on this point.Upvotes: 2