Reputation: 204
I have a set of asynchronous processing methods, similar to:
public class AsyncProcessor<T>
{
//...rest of members, etc.
public Task Process(T input)
{
//Some special processing, most likely inside a Task, so
//maybe spawn a new Task, etc.
Task task = Task.Run(/* maybe private method that does the processing*/);
return task;
}
}
I would like to chain them all together, to execute in sequential order.
I have tried to do the following:
public class CompositeAsyncProcessor<T>
{
private readonly IEnumerable<AsyncProcessor<T>> m_processors;
//Constructor receives the IEnumerable<AsyncProcessor<T>> and
//stores it in the field above.
public Task ProcessInput(T input)
{
Task chainedTask = Task.CompletedTask;
foreach (AsyncProcessor<T> processor in m_processors)
{
chainedTask = chainedTask.ContinueWith(t => processor.Process(input));
}
return chainedTask;
}
}
However, tasks do not run in order because, from what I have understood, inside the call to ContinueWith
, the processor.Process(input)
call is performed immediately and the method returns independently of the status of the returned task. Therefore, all processing Tasks still begin almost simultaneously.
My question is whether there is something elegant that I can do to chain the tasks in order (i.e. without execution overlap). Could I achieve this using the following statement, (I am struggling a bit with the details), for example?
chainedTask = chainedTask.ContinueWith(async t => await processor.Process(input));
Also, how would I do this without using async/await, only ContinueWith
?
Because my Processor
objects have access to, and request things from "thread-unsafe" resources. Also, I cannot just await all the methods because I have no idea about how many they are, so I cannot just write down the necessary lines of code.
Because I may be using the term incorrectly, an illustration is a bit better to explain this bit. Among the "resources" used by my Processor
objects, all of them have access to an object such as the following:
public interface IRepository
{
void Add(object obj);
bool Remove(object obj);
IEnumerable<object> Items { get; }
}
The implementation currently used is relatively naive. So some Processor
objects add things, while others retrieve the Items
for inspection. Naturally, one of the exceptions I get all too often is:
InvalidOperationException
: Collection was modified, enumeration operation may not execute.
I could spend some time locking access and pre-running the enumerations. However, this was the second option I would get down to, while my first thought was to just make the processes run sequentially.
While I have full control in this case, I could say that for the purposes of the question, I might not be able to change the base implementation, so what would happen if I were stuck with Tasks? Furthermore, the operations actually do represent relatively time-consuming CPU-bound operations plus I am trying to achieve a responsive user interface so I needed to unload some burden to asynchronous operations. While being useful and, in most of my use-cases, not having the necessity to chain multiple of them, rather a single one each time (or a couple, but always specific and of a specific count, so I was able to hook them together without iterations and async/await), one of the use-cases finally necessitated chaining an unknown number of Tasks together.
The way I am dealing with this currently is to append a call to Wait()
inside the ContinueWith
call, i.e.:
foreach (AsyncProcessor<T> processor in m_processors)
{
chainedTask = chainedTask.ContinueWith(t => processor.Process(input).Wait());
}
I would appreciate any idea on how I should do this, or how I could do it more elegantly (or, "async-properly", so to speak). Also, I would like to know how I can do this without async/await.
Because the linked question has two tasks, so the solution is to simply write the two lines required, while I have an arbitrary (and unknown) number of tasks, so I need an suitable iteration. Also, my method is not async. I now understand (from the single briefly available answer, which was deleted) that I could do it fairly easily if I changed my method to async
and await
each processor's Task
method, but I still wish to know how this could be achieved without async/await syntax.
Because none of them explains how to chain correctly using ContinueWith
and I am interested in a solution that utilizes ContinueWith
and does not make use of the async/await pattern. I know this pattern may be the preferable solution, I want to understand how to (if possible) make arbitrary chaining using ContinueWith
calls properly. I now know I don't need ContinueWith
. The question is, how do I do it with ContinueWith
?
Upvotes: 1
Views: 742
Reputation: 43981
The method Task.ContinueWith
does not understand async delegates, like Task.Run
do, so when you return a Task
it considers this as a normal return value and wraps it in another Task
. So you end up receiving a Task<Task>
instead of what you expected to get. The problem would be obvious if the AsyncProcessor.Process
was returning a generic Task<T>
. In this case you would get a compile error because of the illegal casting from Task<Task<T>>
to Task<T>
. In your case you cast from Task<Task>
to Task
, which is legal, since Task<TResult>
derives from Task
.
Solving the problem is easy. You just need to unwrap the Task<Task>
to a simple Task
, and there is a built-in method Unwrap
that does exactly that.
There is another problem that you need to solve though. Currently your code suppresses all exceptions that may occur on each individual AsyncProcessor.Process
, which I don't think it was intended. So you must decide which strategy to follow in this case. Are you going to propagate the first exception immediately, or you prefer to cache them all and propagate them at the end bundled in an AggregateException
, like the Task.WhenAll
does? The example bellow implements the first strategy.
public class CompositeAsyncProcessor<T>
{
//...
public Task Process(T input)
{
Task current = Task.CompletedTask;
foreach (AsyncProcessor<T> processor in m_processors)
{
current = current.ContinueWith(antecessor =>
{
if (antecessor.IsFaulted)
return Task.FromException<T>(antecessor.Exception.InnerException);
return processor.Process(input);
},
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default
).Unwrap();
}
return current;
}
}
I have used an overload of ContinueWith
that allows configuring all the options, because the defaults are not ideal. The default TaskContinuationOptions
is None
. Configuring it to ExecuteSynchronously
you minimize the thread switches, since each continuation will run in the same thread that completed the previous one.
The default task scheduler is TaskScheduler.Current
. By specifying TaskScheduler.Default
you make it explicit that you want the continuations to run in thread-pool threads (for some exceptional cases that won't be able to run synchronously). The TaskScheduler.Current
is context specific, and if it ever surprises you it won't be in a good way.
As you see there are a lot of gotchas with the old-school ContinueWith
approach. Using the modern await
in a loop is a lot easier to implement, and a lot more difficult to get it wrong.
Upvotes: 1
Reputation: 35135
foreach
+ await
will run Process
es sequentially.
public async Task ProcessInputAsync(T input)
{
foreach (var processor in m_processors)
{
await processor.Process(input));
}
}
Btw. Process
, should be called ProcessAsync
Upvotes: 2