Jesse Hallam
Jesse Hallam

Reputation: 6964

Wait for pooled threads to complete

I'm sorry for a redundant question. However, I've found many solutions to my problem but none of them are very well explained. I'm hoping that it will be made clear, here.

My C# application's main thread spawns 1..n background workers using the ThreadPool. I wish for the original thread to lock until all of the workers have completed. I have researched the ManualResetEvent in particular but I'm not clear on it's use.

In pseudo:

foreach( var o in collection )
{
  queue new worker(o);
}

while( workers not completed ) { continue; }

If necessary, I will know the number of workers that are about to be queued before hand.

Upvotes: 40

Views: 59618

Answers (10)

venkatasai suri
venkatasai suri

Reputation: 1

Wait for completion of all threads in thread pool there is no inbuilt method available. Using count no. of threads are active, we can achieve it...

{
        bool working = true;
        ThreadPool.GetMaxThreads(out int maxWorkerThreads, out int maxCompletionPortThreads);
        while (working)
        {
            ThreadPool.GetAvailableThreads(out int workerThreads, out int completionPortThreads);
            //Console.WriteLine($"{workerThreads} , {maxWorkerThreads}");
            if (workerThreads == maxWorkerThreads)
            { working = false; }
        }
        //when all threads are completed then 'working' will be false 
    }
    void xyz(object o)
    {
        console.writeline("");
    }

Upvotes: -2

nikoo28
nikoo28

Reputation: 2971

Try using CountdownEvent

// code before the threads start

CountdownEvent countdown = new CountdownEvent(collection.Length);

foreach (var o in collection)
{
    ThreadPool.QueueUserWorkItem(delegate
    {
        // do something with the worker
        Console.WriteLine("Thread Done!");
        countdown.Signal();
    });
}
countdown.Wait();

Console.WriteLine("Job Done!");

// resume the code here

The countdown would wait until all threads have finished execution.

Upvotes: 1

Brian Gideon
Brian Gideon

Reputation: 48949

Here is a solution using the CountdownEvent class.

var complete = new CountdownEvent(1);
foreach (var o in collection)
{
  var capture = o;
  ThreadPool.QueueUserWorkItem((state) =>
    {
      try
      {
        DoSomething(capture);
      }
      finally
      {
        complete.Signal();
      }
    }, null);
}
complete.Signal();
complete.Wait();

Of course, if you have access to the CountdownEvent class then you have the whole TPL to work with. The Parallel class takes care of the waiting for you.

Parallel.ForEach(collection, o =>
  {
    DoSomething(o);
  });

Upvotes: 3

user182129
user182129

Reputation:

I have been using the new Parallel task library in CTP here:

       Parallel.ForEach(collection, o =>
            {
                DoSomeWork(o);
            });

Upvotes: 9

Marc Gravell
Marc Gravell

Reputation: 1062780

First, how long do the workers execute? pool threads should generally be used for short-lived tasks - if they are going to run for a while, consider manual threads.

Re the problem; do you actually need to block the main thread? Can you use a callback instead? If so, something like:

int running = 1; // start at 1 to prevent multiple callbacks if
          // tasks finish faster than they are started
Action endOfThread = delegate {
    if(Interlocked.Decrement(ref running) == 0) {
        // ****run callback method****
    }
};
foreach(var o in collection)
{
    var tmp = o; // avoid "capture" issue
    Interlocked.Increment(ref running);
    ThreadPool.QueueUserWorkItem(delegate {
        DoSomeWork(tmp); // [A] should handle exceptions internally
        endOfThread();
    });
}
endOfThread(); // opposite of "start at 1"

This is a fairly lightweight (no OS primitives) way of tracking the workers.

If you need to block, you can do the same using a Monitor (again, avoiding an OS object):

    object syncLock = new object();
    int running = 1;
    Action endOfThread = delegate {
        if (Interlocked.Decrement(ref running) == 0) {
            lock (syncLock) {
                Monitor.Pulse(syncLock);
            }
        }
    };
    lock (syncLock) {
        foreach (var o in collection) {
            var tmp = o; // avoid "capture" issue
            ThreadPool.QueueUserWorkItem(delegate
            {
                DoSomeWork(tmp); // [A] should handle exceptions internally
                endOfThread();
            });
        }
        endOfThread();
        Monitor.Wait(syncLock);
    }
    Console.WriteLine("all done");

Upvotes: 14

Joseph Kingry
Joseph Kingry

Reputation: 8228

Using .NET 4.0 Barrier class:

        Barrier sync = new Barrier(1);

        foreach(var o in collection)
        {
            WaitCallback worker = (state) => 
            {
                // do work
                sync.SignalAndWait();
            };

            sync.AddParticipant();
            ThreadPool.QueueUserWorkItem(worker, o);
        }

        sync.SignalAndWait();

Upvotes: 1

Gordon Thompson
Gordon Thompson

Reputation: 4834

I've found a good solution here :

http://msdn.microsoft.com/en-us/magazine/cc163914.aspx

May come in handy for others with the same issue

Upvotes: 1

Marc Gravell
Marc Gravell

Reputation: 1062780

Here's a different approach - encapsulation; so your code could be as simple as:

    Forker p = new Forker();
    foreach (var obj in collection)
    {
        var tmp = obj;
        p.Fork(delegate { DoSomeWork(tmp); });
    }
    p.Join();

Where the Forker class is given below (I got bored on the train ;-p)... again, this avoids OS objects, but wraps things up quite neatly (IMO):

using System;
using System.Threading;

/// <summary>Event arguments representing the completion of a parallel action.</summary>
public class ParallelEventArgs : EventArgs
{
    private readonly object state;
    private readonly Exception exception;
    internal ParallelEventArgs(object state, Exception exception)
    {
        this.state = state;
        this.exception = exception;
    }

    /// <summary>The opaque state object that identifies the action (null otherwise).</summary>
    public object State { get { return state; } }

    /// <summary>The exception thrown by the parallel action, or null if it completed without exception.</summary>
    public Exception Exception { get { return exception; } }
}

/// <summary>Provides a caller-friendly wrapper around parallel actions.</summary>
public sealed class Forker
{
    int running;
    private readonly object joinLock = new object(), eventLock = new object();

    /// <summary>Raised when all operations have completed.</summary>
    public event EventHandler AllComplete
    {
        add { lock (eventLock) { allComplete += value; } }
        remove { lock (eventLock) { allComplete -= value; } }
    }
    private EventHandler allComplete;
    /// <summary>Raised when each operation completes.</summary>
    public event EventHandler<ParallelEventArgs> ItemComplete
    {
        add { lock (eventLock) { itemComplete += value; } }
        remove { lock (eventLock) { itemComplete -= value; } }
    }
    private EventHandler<ParallelEventArgs> itemComplete;

    private void OnItemComplete(object state, Exception exception)
    {
        EventHandler<ParallelEventArgs> itemHandler = itemComplete; // don't need to lock
        if (itemHandler != null) itemHandler(this, new ParallelEventArgs(state, exception));
        if (Interlocked.Decrement(ref running) == 0)
        {
            EventHandler allHandler = allComplete; // don't need to lock
            if (allHandler != null) allHandler(this, EventArgs.Empty);
            lock (joinLock)
            {
                Monitor.PulseAll(joinLock);
            }
        }
    }

    /// <summary>Adds a callback to invoke when each operation completes.</summary>
    /// <returns>Current instance (for fluent API).</returns>
    public Forker OnItemComplete(EventHandler<ParallelEventArgs> handler)
    {
        if (handler == null) throw new ArgumentNullException("handler");
        ItemComplete += handler;
        return this;
    }

    /// <summary>Adds a callback to invoke when all operations are complete.</summary>
    /// <returns>Current instance (for fluent API).</returns>
    public Forker OnAllComplete(EventHandler handler)
    {
        if (handler == null) throw new ArgumentNullException("handler");
        AllComplete += handler;
        return this;
    }

    /// <summary>Waits for all operations to complete.</summary>
    public void Join()
    {
        Join(-1);
    }

    /// <summary>Waits (with timeout) for all operations to complete.</summary>
    /// <returns>Whether all operations had completed before the timeout.</returns>
    public bool Join(int millisecondsTimeout)
    {
        lock (joinLock)
        {
            if (CountRunning() == 0) return true;
            Thread.SpinWait(1); // try our luck...
            return (CountRunning() == 0) ||
                Monitor.Wait(joinLock, millisecondsTimeout);
        }
    }

    /// <summary>Indicates the number of incomplete operations.</summary>
    /// <returns>The number of incomplete operations.</returns>
    public int CountRunning()
    {
        return Interlocked.CompareExchange(ref running, 0, 0);
    }

    /// <summary>Enqueues an operation.</summary>
    /// <param name="action">The operation to perform.</param>
    /// <returns>The current instance (for fluent API).</returns>
    public Forker Fork(ThreadStart action) { return Fork(action, null); }

    /// <summary>Enqueues an operation.</summary>
    /// <param name="action">The operation to perform.</param>
    /// <param name="state">An opaque object, allowing the caller to identify operations.</param>
    /// <returns>The current instance (for fluent API).</returns>
    public Forker Fork(ThreadStart action, object state)
    {
        if (action == null) throw new ArgumentNullException("action");
        Interlocked.Increment(ref running);
        ThreadPool.QueueUserWorkItem(delegate
        {
            Exception exception = null;
            try { action(); }
            catch (Exception ex) { exception = ex;}
            OnItemComplete(state, exception);
        });
        return this;
    }
}

Upvotes: 31

James
James

Reputation: 12796

I think you were on the right track with the ManualResetEvent. This link has a code sample that closely matches what your trying to do. The key is to use the WaitHandle.WaitAll and pass an array of wait events. Each thread needs to set one of these wait events.

   // Simultaneously calculate the terms.
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateBase));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateFirstTerm));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateSecondTerm));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateThirdTerm));

    // Wait for all of the terms to be calculated.
    WaitHandle.WaitAll(autoEvents);

    // Reset the wait handle for the next calculation.
    manualEvent.Reset();

Edit:

Make sure that in your worker thread code path you set the event (i.e. autoEvents1.Set();). Once they are all signaled the waitAll will return.

void CalculateSecondTerm(object stateInfo)
{
    double preCalc = randomGenerator.NextDouble();
    manualEvent.WaitOne();
    secondTerm = preCalc * baseNumber * 
        randomGenerator.NextDouble();
    autoEvents[1].Set();
}

Upvotes: 1

JaredPar
JaredPar

Reputation: 754715

Try this. The function takes in a list of Action delegates. It will add a ThreadPool worker entry for each item in the list. It will wait for every action to complete before returning.

public static void SpawnAndWait(IEnumerable<Action> actions)
{
    var list = actions.ToList();
    var handles = new ManualResetEvent[actions.Count()];
    for (var i = 0; i < list.Count; i++)
    {
        handles[i] = new ManualResetEvent(false);
        var currentAction = list[i];
        var currentHandle = handles[i];
        Action wrappedAction = () => { try { currentAction(); } finally { currentHandle.Set(); } };
        ThreadPool.QueueUserWorkItem(x => wrappedAction());
    }

    WaitHandle.WaitAll(handles);
}

Upvotes: 56

Related Questions