Reputation: 10152
I have a very basic, linear pipeline, in which I'd like to propagate completion and wait until everything completes:
static void Main(string[] args)
{
ExecutePipeline().Wait();
}
static async Task ExecutePipeline()
{
var addBlock = new TransformBlock<int, int>(x =>
{
var result = x + 2;
Console.WriteLine(result);
return result;
});
var subBlock = new TransformBlock<int, int>(x =>
{
var result = x - 2;
Console.WriteLine(result);
return result;
});
var mulBlock = new TransformBlock<int, int>(x =>
{
var result = x * 2;
Console.WriteLine(result);
return result;
});
var divBlock = new TransformBlock<int, int>(x =>
{
var result = x / 2;
Console.WriteLine(result);
return result;
});
var flowOptions = new DataflowLinkOptions { PropagateCompletion = true };
addBlock.LinkTo(mulBlock, flowOptions);
mulBlock.LinkTo(subBlock, flowOptions);
subBlock.LinkTo(divBlock, flowOptions);
addBlock.Post(4);
addBlock.Complete();
mulBlock.Complete();
subBlock.Complete();
await divBlock.Completion;
}
Unfortunately, in its current state, only the result of addBlock
gets printed and the program terminates, instead of printing all of the results before termination.
If I comment out all of the lines which call Complete() on their blocks or if I leave addBlock.Complete()
uncommented, I get a printout of all results in the pipeline, but the program never ends, since the completion is not propagated. However, if I unblock either mulBlock.Complete()
or subBlock.Complete()
, similarly to how the default code behaves, the program prints out the result of addBlock
and terminates.
What's interesting is that uncommenting either of those two last mentioned blocks or all of them has the same behavior, which makes me question how the completion propagates if one of them is commented. Obviously, I'm missing something in the logic, but I just can't figure out what it is. How would I accomplish the desired behavior of printing all of the results?
EDIT:
So, I finally found something that worked for me at https://stackoverflow.com/a/26803579/2006048
It appears that I needed to change the last block of code to simply this:
addBlock.Post(4);
addBlock.Complete();
await addBlock.Completion;
The original code did not work because Complete()
was called on each block before data could propagate, so it was a case of a race condition.
However, with this new edited code, it's calling Complete()
on addBlock
and awaits for its completion. This makes the program work as intended, but leaves me yet more confused. Why is it that Completion
must be awaited from the addBlock
and not from the last block in the chain, which is divBlock
? I would think that Completion()
is only called on addBlock
because PropagationCompletion
is set to true
, but then I would think that we would wait for completion of the last block, not the first one.
If I await for the completion of mulBlock
, then only the results of addBlock
get printed. If I await for the completion of subBlock
, the results of addBlock
and mulBlock
get printed. If I await for completion of divBlock
, the results of addBlock
, mulBlock
and subBlock
get printed.
I was basing my code on Stephen Cleary's Concurrency in C# Cookbook example (Section 4.1 Linking Blocks (Page 48)):
var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
var subtractBlock = new TransformBlock<int, int>(item => item - 2);
var options = new DataflowLinkOptions { PropagateCompletion = true };
multiplyBlock.LinkTo(subtractBlock, options);
...
// The first block's completion is automatically propagated to the second block.
multiplyBlock.Complete();
await subtractBlock.Completion;
When I setup Cleary's code to match what I have, the same behavior is exhibited. Program prints result and terminates only when I await for multiplyBlock.Completion
.
Upvotes: 2
Views: 684
Reputation: 244878
The problem is that a block completes only after all its queues are emptied, which includes the output queue. What happens in your case is that the completion propagates correctly, but then divBlock
gets stuck in the "almost complete" mode, waiting for the item in its output queue to be removed.
To solve this, you can either change divBlock
to be an ActionBlock
, or you can link it to a DataflowBlock.NullTarget<int>()
.
Upvotes: 3