Reputation: 5964
I have a requirement to run a set of heavy functions asynchronously at sametime and populate the results in a list. Here is the pseudo code for this :
List<TResult> results = new List<TResults>();
List<Func<T, TResult>> tasks = PopulateTasks();
foreach(var task in tasks)
{
// Run Logic in question
1. Run each task asynchronously/parallely
2. Put the results in the results list upon each task completion
}
Console.WriteLine("All tasks completed and results populated");
I need the logic inside the foreach
bock. Can you guys plz help me?
I have some constraint : The solution must be .net 3.5 compliant (not .net 4, but a .net 4 alternative solution would be appreciated for my knowledge purpose)
Thanks in advance.
Upvotes: 4
Views: 5701
Reputation: 8531
Do your processing in separate worker instances, each on their own thread. Use a callback to pass back the results and signal the calling process that the thread is done. Use a Dictionary to keep track of running threads. If you have lots of threads you should load a Queue and launch new threads as old ones finish. In this example all of the threads are created before any are launched to prevent a race condition where the running thread count drops to zero before the final threads are launched.
Dictionary<int, Thread> activeThreads = new Dictionary<int, Thread>();
void LaunchWorkers()
{
foreach (var task in tasks)
{
Worker worker = new Worker(task, new WorkerDoneDelegate(ProcessResult));
Thread thread = new Thread(worker.Done);
thread.IsBackground = true;
activeThreads.Add(thread.ManagedThreadId, thread);
}
lock (activeThreads)
{
activeThreads.Values.ToList().ForEach(n => n.Start());
}
}
void ProcessResult(int threadId, TResult result)
{
lock (results)
{
results.Add(result);
}
lock (activeThreads)
{
activeThreads.Remove(threadId);
// done when activeThreads.Count == 0
}
}
}
public delegate void WorkerDoneDelegate(object results);
class Worker
{
public WorkerDoneDelegate Done;
public void Work(Task task, WorkerDoneDelegate Done)
{
// process task
Done(Thread.CurrentThread.ManagedThreadId, result);
}
}
Upvotes: 0
Reputation: 38112
A simple 3.5 implementation could look like this
List<TResult> results = new List<TResults>();
List<Func<T, TResult>> tasks = PopulateTasks();
ManualResetEvent waitHandle = new ManualResetEvent(false);
void RunTasks()
{
int i = 0;
foreach(var task in tasks)
{
int captured = i++;
ThreadPool.QueueUserWorkItem(state => RunTask(task, captured))
}
waitHandle.WaitOne();
Console.WriteLine("All tasks completed and results populated");
}
private int counter;
private readonly object listLock = new object();
void RunTask(Func<T, TResult> task, int index)
{
var res = task(...); //You haven't specified where the parameter comes from
lock (listLock )
{
results[index] = res;
}
if (InterLocked.Increment(ref counter) == tasks.Count)
waitHandle.Set();
}
Upvotes: 4
Reputation: 8008
Another variant would be with a small future pattern implementation:
public class Future<T>
{
public Future(Func<T> task)
{
Task = task;
_asyncContext = task.BeginInvoke(null, null);
}
private IAsyncResult _asyncContext;
public Func<T> Task { get; private set; }
public T Result
{
get
{
return Task.EndInvoke(_asyncContext);
}
}
public bool IsCompleted
{
get { return _asyncContext.IsCompleted; }
}
}
public static IList<Future<T>> RunAsync<T>(IEnumerable<Func<T>> tasks)
{
List<Future<T>> asyncContext = new List<Future<T>>();
foreach (var task in tasks)
{
asyncContext.Add(new Future<T>(task));
}
return asyncContext;
}
public static IEnumerable<T> WaitForAll<T>(IEnumerable<Future<T>> futures)
{
foreach (var future in futures)
{
yield return future.Result;
}
}
public static void Main()
{
var tasks = Enumerable.Repeat<Func<int>>(() => ComputeValue(), 10).ToList();
var futures = RunAsync(tasks);
var results = WaitForAll(futures);
foreach (var result in results)
{
Console.WriteLine(result);
}
}
public static int ComputeValue()
{
Thread.Sleep(1000);
return Guid.NewGuid().ToByteArray().Sum(a => (int)a);
}
Upvotes: 1
Reputation: 8008
public static IList<IAsyncResult> RunAsync<T>(IEnumerable<Func<T>> tasks)
{
List<IAsyncResult> asyncContext = new List<IAsyncResult>();
foreach (var task in tasks)
{
asyncContext.Add(task.BeginInvoke(null, null));
}
return asyncContext;
}
public static IEnumerable<T> WaitForAll<T>(IEnumerable<Func<T>> tasks, IEnumerable<IAsyncResult> asyncContext)
{
IEnumerator<IAsyncResult> iterator = asyncContext.GetEnumerator();
foreach (var task in tasks)
{
iterator.MoveNext();
yield return task.EndInvoke(iterator.Current);
}
}
public static void Main()
{
var tasks = Enumerable.Repeat<Func<int>>(() => ComputeValue(), 10).ToList();
var asyncContext = RunAsync(tasks);
var results = WaitForAll(tasks, asyncContext);
foreach (var result in results)
{
Console.WriteLine(result);
}
}
public static int ComputeValue()
{
Thread.Sleep(1000);
return Guid.NewGuid().ToByteArray().Sum(a => (int)a);
}
Upvotes: 1
Reputation: 52679
the traditional way is to use a Sempahore. Initialise the semaphore with the number of threads you're using then kick off a thread per task and wait on the semaphore object. When each thread completes, it should increment the semaphore. When the semaphore count reaches 0, the main thread that was waiting will continue.
Upvotes: 0
Reputation: 25693
List<Func<T, TResult>> tasks = PopulateTasks();
TResult[] results = new TResult[tasks.Length];
Parallel.For(0, tasks.Count, i =>
{
results[i] = tasks[i]();
});
TPL for 3.5 apparently exists.
Upvotes: 4