danpalmer
danpalmer

Reputation: 2173

Structuring Task Processing Pipeline with the Task Parallel Library

I come from an Objective-C background, where I would use Grand Central Dispatch or NSOperations to solve this problem fairly trivially. Unfortunately I think I'm stuck in that way of thinking when trying to structure this problem in C#.

I have high-level tasks, each of which have multiple parts that can happen in parallel. Each of these parts will need to go through several stages in a pipeline. I need to construct this pipeline, but know when the high-level task completes, and execute a callback.

With GCD, I would create queues to perform the parts on, each chaining to the next part of the process. All of these parts would be grouped based on the high level task they were part of, so that a callback could be triggered at the end.

I'm struggling to work out how this would work in C#. I've mostly been researching the Task Parallel Library, but have no particular preference for what I use. One issue I have run into so far is that completion callbacks only seem to be possible with TPL pipelines if you finish the processing, but as I will have multiple tasks that wouldn't happen.

At an overview sort of level, how would this problem best be structured? I wonder if it might be better to write the system with Rx providing the concurrency?

Upvotes: 2

Views: 2642

Answers (3)

Aleš Roubíček
Aleš Roubíček

Reputation: 5187

TPL is good enough to solve your scenario. I recommend you to read this article on MSDN about pipelines in TPL.

Upvotes: 0

Stephen Cleary
Stephen Cleary

Reputation: 456477

Sounds to me like TPL Dataflow is the right way to go. Rx can do anything that Dataflow can, but really excels at event/time management while Dataflow's syntax is cleaner for actual dataflows (including pipelines).

You are correct that Dataflow does not have any kind of per-item completion notification built-in. You'll have to add that yourself, e.g., by sticking an ActionBlock at the end of every item.

You may find my AsyncEx library useful. In particular, I have a number of asynchronous coordination primitives including AsyncCountdownEvent which sounds like it may be what you need.

Upvotes: 1

pkt
pkt

Reputation: 1838

I don't quite understand what you mean by completion callbacks not being an option due to having multiple tasks. Constructing a dataflow network for each task would mean the completion is triggered for them individually.

I'm guessing you would want to avoid the overhead resulting from reconstructing the network each time? In that case, maybe you could add a passthrough block of sorts at the end: it returns whatever input it's given, and also calls whatever callback you need. So for each output produced by the network, the callback would be invoked. If you want to take it a step further, it could instead post a message to another block, which can then invoke the callbacks in parallel.

Alternatively, if the pipeline is simple enough, and you don't need the extra buffering and whatnot, maybe you could do it with plain TPL tasks? Something like this:

public async Task<string> HighLevelTask(string input1, string input2, Action completed) {
    Task<string[]> parts = Task.WhenAll(Part1(input1), Part2(input2));
    string[] results = await parts;
    completed();
    return string.Join(",", results);
}
public async Task<string> Part1(string input) {
    var result1 = await Stage1(input);
    var result2 = await Stage2(result1);
    return result2;
}

Upvotes: 1

Related Questions