Reputation: 21551
I'd like to implement the following in a Silverlight 5.0 application using Task Parallel Library (SL5 has Task factories but no Parallel.For). I have plenty of threading knowledge but none on TPL so this seems a good task to get some :)
Currently I have some code, which executes synchronously as follows:
public interface IProcessor
{
IEnumerable<Bar> Provide(Foo param)
}
private IEnumerable<IProcessor> processors;
public void DoMultiOperations(Foo param, Action<IEnumerable<Bar>> callback)
{
List<Bar> allResults = new List<Bar>();
foreach(var processor in this.processors)
{
allResults.AddRange(processor.Provide(param));
}
callback(allResults);
}
Consider each IProcessor
accepts a Foo
parameter to Provide
returning an IEnumerable<Bar>
. The aggregation of all results are sent back to the caller via the callback.
Now some of the IProcessors execute immediately. Some make a call to a server and can take several seconds. I would like to schedule N tasks for N IProcessor
instances and when all are complete (or time-out) concatenate the IEnumerable<Bar>
results.
If possible I'd like to add a timeout to the overall operation so if all don't complete within 15 seconds, throw.
Your help much appreciated :)
Upvotes: 4
Views: 1168
Reputation: 284
Again I can't test this code but it should work if Silverlight doesn't have Parallel.ForEach you can you use Task.WaitAll
private IEnumerable<IProcessor> processors;
public void DoMultiOperations(Foo param, Action<IEnumerable<Bar>> callback)
{
var allResults = new ConcurrentQueue<Bar>();
Task.WaitAll(processors.Select(processor => Task.Factory.StartNew(() => GetData(processor, param, allResults))).ToArray());
callback(allResults);
}
private static void GetData(IProcessor processor, Foo param, ConcurrentQueue<Bar> allResults)
{
var enumerable = processor.Provide(param);
foreach (var bar in enumerable)
{
allResults.Enqueue(bar);
}
}
Upvotes: 4
Reputation: 23831
Why not use something like the following
// Allow for cancellation.
CancellationTokenSource cancelSource = new CancellationTokenSource();
CancellationToken token = new CancellationToken();
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
List<Bar> allResults = new List<Bar>();
Task<List<Bar>> asyncTask = Task.Factory.StartNew<List<Bar>>(() => asyncMethod(token, atp), token);
// Continuation is returned when the asyncMethod is complete.
asyncTask.ContinueWith(task =>
{
// Handle the result.
switch (task.Status)
{
// Handle any exceptions to prevent UnobservedTaskException.
case TaskStatus.RanToCompletion:
break;
case TaskStatus.Canceled:
break;
case TaskStatus.Faulted:
}
}
In your asyncMethod
you could do something like the following
private List<Bar> asyncMethod(CancellationToken token)
{
List<Bar> allResults = new List<Bar>();
foreach(var processor in this.processors)
{
Task.Factory.StartNew<IEnumerable<Bar>>(() =>
{
processor.Provide(param);
}, atp).ContinueWith( cont => { allResults.AddRange(cont.Result) });
// Cancellation requested from UI Thread.
if (token.IsCancellationRequested)
token.ThrowIfCancellationRequested();
}
return allResults;
}
Yo can then get the overall result (a List<Bar>
) from the continuation called task
in the first snippet. You call the cancellation through some event through the UI like
// Cancellation requested from UI Thread.
if (token.IsCancellationRequested)
token.ThrowIfCancellationRequested();
I could not test this, but something like the above should work. Please see this great introduction to TPL for more information and use of the class...
I do hope this is of use.
Upvotes: 1
Reputation: 4795
I think this is roughly correct
public void DoMultiOperations(Foo param, Action<IEnumerable<Bar>> callback)
{
var allResults = new List<Bar>();
// We are using all the default options on the TaskFactory
// except when we are appending the results this has to be synchronized
// as List<> is not multithreading safe (a more appropriate collection perhaps)
var taskFactory = new TaskFactory<IEnumerable<Bar>>(
TaskCreationOptions.None,
TaskContinuationOptions.ExecuteSynchronously);
// Kick off a task for every processor
var tasks =
new List<Task<IEnumerable<Bar>>>(processors.Count());
tasks.AddRange(
processors.Select(
processor =>
taskFactory.StartNew(() => processor.Provide(param))));
if (Task.WaitAll(tasks.ToArray(), 5 * 1000))
{
foreach (Task<IEnumerable<Bar>> task in tasks)
{
allResults.AddRange(task.Result);
}
callback(allResults);
}
}
Upvotes: 1
Reputation: 62544
This would run all tasks in parallel asynchronously:
public void DoMultiOperations(Foo param, Action<IEnumerable<Bar>> callback)
{
// since each task's callback would access this storage - we are using
// one of the concurrent queue
ConcurrentQueue<Bar> allResults = new ConcurrentQueue<Bar>();
Task[] tasks = this.processors.Select(p => new Task(() =>
{
IEnumerable<Bar> results = p.Provide(param);
foreach (var newItem in results)
{
allResults.Enqueue(newItem);
}
})).ToArray();
foreach (var task in tasks)
{
task.Start();
}
// 5 seconds to wait or inject a value into this method
Task.WaitAll(tasks, 5000);
callback(allResults);
}
Upvotes: 1
Reputation: 7636
With TPL you can pass the loop state to inform the other threads to abort in the case of a timeout. You need to do something like:
public void DoMultiOperations(Foo param, Action<IEnumerable<Bar>> callback)
{
ConcurrentBag<Bar> allResults = new ConcurrentBag<Bar>();
Stopwatch sw = new Stopwatch();
sw.Start();
Parallel.ForEach(this.processors, (processor, loopState) =>
{
foreach (Bar item in processor.Provide(param))
{
allResults.Add(item);
}
if (sw.ElapsedMilliseconds > 15000)
{
loopState.Stop();
throw new TimeoutException();
}
});
callback(allResults);
}
Upvotes: 0