boger
boger

Reputation: 623

How to complete correctly a branched pipeline with BatchBlock?

Consider the following pipeline:

public static void Main()
{
    var firstBlock = new TransformBlock<string, string>(s =>
    {
         Console.WriteLine("FirstBlock");
         return s + "-FirstBlock";
    });
    var batchBlock = new BatchBlock<string>(100, new GroupingDataflowBlockOptions { Greedy = true });
    var afterBatchBlock = new TransformManyBlock<string[], string>(strings =>
    {
        Console.WriteLine("AfterBatchBlock");
        return new[] { strings[0] + "-AfterBatchBlock" };
    });
    var lastBlock = new ActionBlock<string>(s =>
    {
        Console.WriteLine($"LastBlock {s}");
    });
    firstBlock.LinkTo(batchBlock, new DataflowLinkOptions { PropagateCompletion = true }, x => x.Contains("0"));
    batchBlock.LinkTo(afterBatchBlock, new DataflowLinkOptions { PropagateCompletion = true });
        
    afterBatchBlock.LinkTo(lastBlock, new DataflowLinkOptions { PropagateCompletion = true });
        firstBlock.LinkTo(lastBlock, new DataflowLinkOptions { PropagateCompletion = true });

    firstBlock.Post("0");

    firstBlock.Complete();
    firstBlock.Completion.GetAwaiter().GetResult();
    batchBlock.Completion.GetAwaiter().GetResult();
    afterBatchBlock.Completion.GetAwaiter().GetResult();
    lastBlock.Completion.GetAwaiter().GetResult();
}

Running this code gets blocked with the following output:

FirstBlock
AfterBatchBlock

What I think is happening behind the scenes is the following:

My question is:

  1. Is it a bug?
  2. Assuming it is by design, what is the recommended way to overcome the bad state my pipeline reached

Upvotes: 1

Views: 133

Answers (1)

Theodor Zoulias
Theodor Zoulias

Reputation: 43495

No, it's not a bug. This is the expected behavior. You must first realize that you are not dealing with a simple, straightforward dataflow pipeline, but rather with a dataflow mesh. It's not a complex mesh, but still not what I would describe as pipeline. Dealing with the completion of dataflow blocks that form a mesh can be tricky. In your case you have one block, the lastBlock, which is the target of two source blocks, the firstBlock and afterBatchBlock. This block should be completed when both source blocks have completed, not either one them. So you can't use the PropagateCompletion = true option when linking this block with its sources.

There is no built-in API to do this, but implementing a two-to-one completion propagator is not very difficult. In fact you could just copy-paste the PropagateCompletion method found in this answer, and use it like this:

afterBatchBlock.LinkTo(lastBlock);
firstBlock.LinkTo(lastBlock);

PropagateCompletion(new IDataflowBlock[] { afterBatchBlock, firstBlock },
    new[] { lastBlock });

Upvotes: 1

Related Questions