Reputation: 623
Consider the following pipeline:
public static void Main()
{
var firstBlock = new TransformBlock<string, string>(s =>
{
Console.WriteLine("FirstBlock");
return s + "-FirstBlock";
});
var batchBlock = new BatchBlock<string>(100, new GroupingDataflowBlockOptions { Greedy = true });
var afterBatchBlock = new TransformManyBlock<string[], string>(strings =>
{
Console.WriteLine("AfterBatchBlock");
return new[] { strings[0] + "-AfterBatchBlock" };
});
var lastBlock = new ActionBlock<string>(s =>
{
Console.WriteLine($"LastBlock {s}");
});
firstBlock.LinkTo(batchBlock, new DataflowLinkOptions { PropagateCompletion = true }, x => x.Contains("0"));
batchBlock.LinkTo(afterBatchBlock, new DataflowLinkOptions { PropagateCompletion = true });
afterBatchBlock.LinkTo(lastBlock, new DataflowLinkOptions { PropagateCompletion = true });
firstBlock.LinkTo(lastBlock, new DataflowLinkOptions { PropagateCompletion = true });
firstBlock.Post("0");
firstBlock.Complete();
firstBlock.Completion.GetAwaiter().GetResult();
batchBlock.Completion.GetAwaiter().GetResult();
afterBatchBlock.Completion.GetAwaiter().GetResult();
lastBlock.Completion.GetAwaiter().GetResult();
}
Running this code gets blocked with the following output:
FirstBlock
AfterBatchBlock
What I think is happening behind the scenes is the following:
My question is:
Upvotes: 1
Views: 133
Reputation: 43495
No, it's not a bug. This is the expected behavior. You must first realize that you are not dealing with a simple, straightforward dataflow pipeline, but rather with a dataflow mesh. It's not a complex mesh, but still not what I would describe as pipeline. Dealing with the completion of dataflow blocks that form a mesh can be tricky. In your case you have one block, the lastBlock
, which is the target of two source blocks, the firstBlock
and afterBatchBlock
. This block should be completed when both source blocks have completed, not either one them. So you can't use the PropagateCompletion = true
option when linking this block with its sources.
There is no built-in API to do this, but implementing a two-to-one completion propagator is not very difficult. In fact you could just copy-paste the PropagateCompletion
method found in this answer, and use it like this:
afterBatchBlock.LinkTo(lastBlock);
firstBlock.LinkTo(lastBlock);
PropagateCompletion(new IDataflowBlock[] { afterBatchBlock, firstBlock },
new[] { lastBlock });
Upvotes: 1