Parallelization of multiple operations and concatenation of results

I'd like to implement the following in a Silverlight 5.0 application using Task Parallel Library (SL5 has Task factories but no Parallel.For). I have plenty of threading knowledge but none on TPL so this seems a good task to get some :)

Currently I have some code, which executes synchronously as follows:

public interface IProcessor
{
    IEnumerable<Bar> Provide(Foo param)
}

private IEnumerable<IProcessor> processors; 

public void DoMultiOperations(Foo param, Action<IEnumerable<Bar>> callback)
{
    List<Bar> allResults = new List<Bar>();

    foreach(var processor in this.processors)
    {
        allResults.AddRange(processor.Provide(param));
    }

    callback(allResults);
}

Consider each IProcessor accepts a Foo parameter to Provide returning an IEnumerable<Bar>. The aggregation of all results are sent back to the caller via the callback.

Now some of the IProcessors execute immediately. Some make a call to a server and can take several seconds. I would like to schedule N tasks for N IProcessor instances and when all are complete (or time-out) concatenate the IEnumerable<Bar> results.

If possible I'd like to add a timeout to the overall operation so if all don't complete within 15 seconds, throw.

Your help much appreciated :)

Upvotes: 4

Views: 1168

Answers (5)

jitidea
jitidea

Reputation: 284

Again I can't test this code but it should work if Silverlight doesn't have Parallel.ForEach you can you use Task.WaitAll

    private IEnumerable<IProcessor> processors;

    public void DoMultiOperations(Foo param, Action<IEnumerable<Bar>> callback)
    {
        var allResults = new ConcurrentQueue<Bar>();
        Task.WaitAll(processors.Select(processor => Task.Factory.StartNew(() => GetData(processor, param, allResults))).ToArray());
        callback(allResults);
    }

    private static void GetData(IProcessor processor, Foo param, ConcurrentQueue<Bar> allResults)
    {
        var enumerable = processor.Provide(param);
        foreach (var bar in enumerable)
        {
            allResults.Enqueue(bar);
        }
    }

Upvotes: 4

MoonKnight
MoonKnight

Reputation: 23831

Why not use something like the following

// Allow for cancellation.
CancellationTokenSource cancelSource = new CancellationTokenSource();
CancellationToken token = new CancellationToken();
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent; 

List<Bar> allResults = new List<Bar>();
Task<List<Bar>> asyncTask = Task.Factory.StartNew<List<Bar>>(() => asyncMethod(token, atp), token);

// Continuation is returned when the asyncMethod is complete.
asyncTask.ContinueWith(task =>
{
    // Handle the result.
    switch (task.Status)
    {
        // Handle any exceptions to prevent UnobservedTaskException.             
        case TaskStatus.RanToCompletion:
            break;
        case TaskStatus.Canceled:
            break;
        case TaskStatus.Faulted:
    }
}

In your asyncMethod you could do something like the following

private List<Bar> asyncMethod(CancellationToken token)
{
    List<Bar> allResults = new List<Bar>();

    foreach(var processor in this.processors)
    {
        Task.Factory.StartNew<IEnumerable<Bar>>(() => 
        {     
            processor.Provide(param);
        }, atp).ContinueWith( cont => { allResults.AddRange(cont.Result) }); 

        // Cancellation requested from UI Thread.
        if (token.IsCancellationRequested)
            token.ThrowIfCancellationRequested();
    }
    return allResults;
}

Yo can then get the overall result (a List<Bar>) from the continuation called task in the first snippet. You call the cancellation through some event through the UI like

// Cancellation requested from UI Thread.
if (token.IsCancellationRequested)
    token.ThrowIfCancellationRequested();

I could not test this, but something like the above should work. Please see this great introduction to TPL for more information and use of the class...

I do hope this is of use.

Upvotes: 1

M Afifi
M Afifi

Reputation: 4795

I think this is roughly correct

public void DoMultiOperations(Foo param, Action<IEnumerable<Bar>> callback)
{
    var allResults = new List<Bar>();

    // We are using all the default options on the TaskFactory 
    // except when we are appending the results this has to be synchronized 
    // as List<> is not multithreading safe (a more appropriate collection perhaps) 
    var taskFactory = new TaskFactory<IEnumerable<Bar>>(
        TaskCreationOptions.None,
        TaskContinuationOptions.ExecuteSynchronously);

    // Kick off a task for every processor
    var tasks =
        new List<Task<IEnumerable<Bar>>>(processors.Count());
    tasks.AddRange(
        processors.Select(
            processor =>
            taskFactory.StartNew(() => processor.Provide(param))));

    if (Task.WaitAll(tasks.ToArray(), 5 * 1000))
    {
        foreach (Task<IEnumerable<Bar>> task in tasks)
        {
            allResults.AddRange(task.Result);
        }
        callback(allResults);
    }
} 

Upvotes: 1

sll
sll

Reputation: 62544

This would run all tasks in parallel asynchronously:

public void DoMultiOperations(Foo param, Action<IEnumerable<Bar>> callback)
{
    // since each task's callback would access this storage - we are using
    // one of the concurrent queue
    ConcurrentQueue<Bar> allResults = new ConcurrentQueue<Bar>();

    Task[] tasks = this.processors.Select(p => new Task(() =>
        {
            IEnumerable<Bar> results = p.Provide(param);
            foreach (var newItem in results)
            {
                allResults.Enqueue(newItem);
            }
        })).ToArray();

    foreach (var task in tasks)
    {
        task.Start();
    }

    // 5 seconds to wait or inject a value into this method
    Task.WaitAll(tasks, 5000);                
    callback(allResults);
}

Upvotes: 1

magritte
magritte

Reputation: 7636

With TPL you can pass the loop state to inform the other threads to abort in the case of a timeout. You need to do something like:

    public void DoMultiOperations(Foo param, Action<IEnumerable<Bar>> callback)
    {
        ConcurrentBag<Bar> allResults = new ConcurrentBag<Bar>();

        Stopwatch sw = new Stopwatch();
        sw.Start();

        Parallel.ForEach(this.processors, (processor, loopState) =>
        {
            foreach (Bar item in processor.Provide(param))
            {
                allResults.Add(item);
            }

            if (sw.ElapsedMilliseconds > 15000)
            {
                loopState.Stop();
                throw new TimeoutException();
            }
        });

        callback(allResults);
    }

Upvotes: 0

Related Questions