Anindya Chatterjee
Anindya Chatterjee

Reputation: 5964

How to run a set of functions in parallel and wait for the results upon completion?

I have a requirement to run a set of heavy functions asynchronously at sametime and populate the results in a list. Here is the pseudo code for this :

List<TResult> results = new List<TResults>();
List<Func<T, TResult>> tasks = PopulateTasks();

foreach(var task in tasks)
{
    // Run Logic in question
    1. Run each task asynchronously/parallely
    2. Put the results in the results list upon each task completion
}

Console.WriteLine("All tasks completed and results populated");

I need the logic inside the foreach bock. Can you guys plz help me?

I have some constraint : The solution must be .net 3.5 compliant (not .net 4, but a .net 4 alternative solution would be appreciated for my knowledge purpose)

Thanks in advance.

Upvotes: 4

Views: 5701

Answers (6)

Ed Power
Ed Power

Reputation: 8531

Do your processing in separate worker instances, each on their own thread. Use a callback to pass back the results and signal the calling process that the thread is done. Use a Dictionary to keep track of running threads. If you have lots of threads you should load a Queue and launch new threads as old ones finish. In this example all of the threads are created before any are launched to prevent a race condition where the running thread count drops to zero before the final threads are launched.

    Dictionary<int, Thread> activeThreads = new Dictionary<int, Thread>();
    void LaunchWorkers()
    {
        foreach (var task in tasks)
        {
            Worker worker = new Worker(task, new WorkerDoneDelegate(ProcessResult));
            Thread thread = new Thread(worker.Done);
            thread.IsBackground = true;
            activeThreads.Add(thread.ManagedThreadId, thread);
        }
        lock (activeThreads)
        {
            activeThreads.Values.ToList().ForEach(n => n.Start());
        }
    }

    void ProcessResult(int threadId, TResult result)
    {
        lock (results)
        {
            results.Add(result);
        }
        lock (activeThreads)
        {
            activeThreads.Remove(threadId);
            // done when activeThreads.Count == 0
        }
    }
}

public delegate void WorkerDoneDelegate(object results);
class Worker
{
    public WorkerDoneDelegate Done;
    public void Work(Task task, WorkerDoneDelegate Done)
    {
        // process task
        Done(Thread.CurrentThread.ManagedThreadId, result);
    }
}

Upvotes: 0

Ohad Schneider
Ohad Schneider

Reputation: 38112

A simple 3.5 implementation could look like this

List<TResult> results = new List<TResults>();
List<Func<T, TResult>> tasks = PopulateTasks();

ManualResetEvent waitHandle = new ManualResetEvent(false);
void RunTasks()
{
    int i = 0;
    foreach(var task in tasks)
    {
        int captured = i++;
        ThreadPool.QueueUserWorkItem(state => RunTask(task, captured))
    }

    waitHandle.WaitOne();

    Console.WriteLine("All tasks completed and results populated");
}

private int counter;
private readonly object listLock = new object();
void RunTask(Func<T, TResult> task, int index)
{
    var res = task(...); //You haven't specified where the parameter comes from
    lock (listLock )
    {
       results[index] = res;
    }
    if (InterLocked.Increment(ref counter) == tasks.Count)
        waitHandle.Set();
}

Upvotes: 4

Adrian Zanescu
Adrian Zanescu

Reputation: 8008

Another variant would be with a small future pattern implementation:

    public class Future<T>
    {
        public Future(Func<T> task)
        {
            Task = task;
            _asyncContext = task.BeginInvoke(null, null);
        }

        private IAsyncResult _asyncContext;

        public Func<T> Task { get; private set; }
        public T Result
        {
            get
            {
                return Task.EndInvoke(_asyncContext);
            }
        }

        public bool IsCompleted
        {
            get { return _asyncContext.IsCompleted; }
        }
    }

    public static IList<Future<T>> RunAsync<T>(IEnumerable<Func<T>> tasks)
    {
        List<Future<T>> asyncContext = new List<Future<T>>();
        foreach (var task in tasks)
        {
            asyncContext.Add(new Future<T>(task));
        }
        return asyncContext;
    }

    public static IEnumerable<T> WaitForAll<T>(IEnumerable<Future<T>> futures)
    {
        foreach (var future in futures)
        {
            yield return future.Result;
        }
    }

    public static void Main()
    {
        var tasks = Enumerable.Repeat<Func<int>>(() => ComputeValue(), 10).ToList();

        var futures = RunAsync(tasks);
        var results = WaitForAll(futures);
        foreach (var result in results)
        {
            Console.WriteLine(result);
        }
    }

    public static int ComputeValue()
    {
        Thread.Sleep(1000);
        return Guid.NewGuid().ToByteArray().Sum(a => (int)a);
    }

Upvotes: 1

Adrian Zanescu
Adrian Zanescu

Reputation: 8008

    public static IList<IAsyncResult> RunAsync<T>(IEnumerable<Func<T>> tasks)
    {
        List<IAsyncResult> asyncContext = new List<IAsyncResult>();
        foreach (var task in tasks)
        {
            asyncContext.Add(task.BeginInvoke(null, null));
        }
        return asyncContext;
    }

    public static IEnumerable<T> WaitForAll<T>(IEnumerable<Func<T>> tasks, IEnumerable<IAsyncResult> asyncContext)
    {
        IEnumerator<IAsyncResult> iterator = asyncContext.GetEnumerator();
        foreach (var task in tasks)
        {
            iterator.MoveNext();
            yield return task.EndInvoke(iterator.Current);
        }
    }

    public static void Main()
    {
        var tasks = Enumerable.Repeat<Func<int>>(() => ComputeValue(), 10).ToList();

        var asyncContext = RunAsync(tasks);
        var results = WaitForAll(tasks, asyncContext);
        foreach (var result in results)
        {
            Console.WriteLine(result);
        }
    }

    public static int ComputeValue()
    {
        Thread.Sleep(1000);
        return Guid.NewGuid().ToByteArray().Sum(a => (int)a); 
    }

Upvotes: 1

gbjbaanb
gbjbaanb

Reputation: 52679

the traditional way is to use a Sempahore. Initialise the semaphore with the number of threads you're using then kick off a thread per task and wait on the semaphore object. When each thread completes, it should increment the semaphore. When the semaphore count reaches 0, the main thread that was waiting will continue.

Upvotes: 0

wRAR
wRAR

Reputation: 25693

List<Func<T, TResult>> tasks = PopulateTasks();
TResult[] results = new TResult[tasks.Length];
Parallel.For(0, tasks.Count, i =>
    {
        results[i] = tasks[i]();
    });

TPL for 3.5 apparently exists.

Upvotes: 4

Related Questions