Reputation: 1159
I have some blocks that eventually go from a TransformBlock to one of three other transform blocks based on the LinkTo predicate. I am using DataflowLinkOptions to propagate the completion. The problem is that when a predicate is satisfied and that block is started the rest of my pipeline continues on. It would seem that the pipeline should wait for this block to finish first.
The code for this is something like this:
var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
mainBlock.LinkTo(block1, linkOptions, x => x.Status = Status.Complete);
mainBlock.LinkTo(block2, linkOptions, x => x.Status = Status.Cancelled);
mainBlock.LinkTo(block3, linkOptions, x => x.Status = Status.Delayed);
mainBlock.LinkTo(DataflowBlock.NullTarget<Thing>(), linkOptions);
Now, this doesn't work as I'd expect as I said, so the only way Ive found to get the behavior that I want is to take the linkOptions out and add the following into the lambda for the mainBlock.
mainBlock = new TransformBlock<Thing,Thing>(input =>
{
DoMyStuff(input);
if (input.Status = Status.Complete)
{
mainBlock.Completion.ContinueWith(t => block1.Complete());
}
if (input.Status = Status.Cancelled)
{
mainBlock.Completion.ContinueWith(t => block2.Complete());
}
if (input.Status = Status.Delayed)
{
mainBlock.Completion.ContinueWith(t => block3.Complete());
}
return input;
});
So the question, is this the only way to get this to work?
BTW, this has been run in my unit test with a single data item running through it to try and debug the pipeline behavior. Each block has been tested individually with multiple unit tests. So what happens in my pipeline unit test is that the assert is hit before the block finished executing and so fails.
If I remove the block2 and block3 links and debug the test using the linkOptions it works fine.
Upvotes: 5
Views: 4376
Reputation: 244918
Your problem is not with the code in your question, that works correctly: when the main block completes, all the three followup blocks are marked for completion too.
The problem is with the end block: you're using PropagateCompletion
there too, which means that when any of the three previous blocks completes, the end block is marked for completion. What you want is to mark it for completion when all three blocks complete and the Task.WhenAll().ContinueWith()
combination from your answer does that (though the first part of that snippet is unnecessary, that does exactly the same thing PropagateCompletion
would).
As it turns out, the link option propagation (at least this is my guess) will propagate the completion for blocks that don't satisfy the predicate in the linkTo.
Yes, it propagates completion always. Completion doesn't have any item associated with it, so it doesn't make any sense to apply the predicate to it. Maybe the fact that you always have only a single item (which is not common) makes this more confusing for you?
If my guess is correct I sort of feel like this is bug or design error in the link option completion propagation. Why should a block be complete if it was never used?
Why shouldn't it? To me, this makes perfect sense: even when there were no items with Status.Delayed
this time around, you still want to complete the block that processes those items, so that any follow-up code can know that all delayed items were already processed. And the fact that there weren't any doesn't matter.
Anyway, if you encounter this often, you might want to create a helper method that links several source blocks to a single target block at the same time and propagates completion correctly:
public static void LinkTo<T>(
this IReadOnlyCollection<ISourceBlock<T>> sources, ITargetBlock<T> target,
bool propagateCompletion)
{
foreach (var source in sources)
{
source.LinkTo(target);
}
if (propagateCompletion)
Task.WhenAll(sources.Select(source => source.Completion))
.ContinueWith(_ => target.Complete());
}
Usage:
new[] { block1, block2, block3 }.LinkTo(endBlock, propagateCompletion: true);
Upvotes: 10
Reputation: 1159
Ok. So I have to thank Cory first off. When I first read his comment I was little annoyed because I felt like my code illustrated the concept pretty well and could be turned into a working version easily. But anyway, I felt the need to do a complete testable version I could post because of his comment.
In my test the surprising part was even though it mimicked my real code the path I thought would fail passed and the path that I though would pass failed. This made my head spin a bit. So I started to do some more permutations of the original code. Basically I created blocks that were synchronous and blocks that were asynchronous and made both kinds of pipelines. Four in total, 2 sync and 2 async, one of each used link options to propagate and the other used completion tasks in the MainBlock as shown.
After adding some task delays to the async tasks I found that the synchronous versions passed the test and the async ones failed.
So, the eventual solution to the problem was sort of none of the above. As it turns out, the link option propagation (at least this is my guess) will propagate the completion for blocks that don't satisfy the predicate in the linkTo. So, when a Thing with a Status of Complete comes down it goes to Block1.
Oh, I should point out in the complete test code I made all blocks 1,2 & 3 connect to the same EndBlock, which is not shown in the original code.
Anyway, after the predicate is satisfied and the Thing goes to Block1, blocks 2 and 3 I believe are set to complete. This causes the EndBlock to complete which we are awaiting in the unit test and the Assert fails because Block1 isn't done doing its work yet.
If my guess is correct I sort of feel like this is bug or design error in the link option completion propagation. Why should a block be complete if it was never used?
So, here is what I did to solve the problem. I took out the link options and manually wired up the completion events. Like this:
MainBlock.Completion.ContinueWith(t =>
{
Block1.Complete();
Block2.Complete();
Block3.Complete();
});
Task.WhenAll(Block1.Completion, Block2.Completion, Block3.Completion)
.ContinueWith(t =>
{
EndBlock.Complete();
});
This worked fine, and when moved to my real code worked as well. The Task.WhenAll is what made me believe that unused blocks were set to complete and why automatic propagation was the problem.
I hope this helps someone. I will come back and add a link when I post all my test code.
Edit: Here is a link to the test code gist https://gist.github.com/jmichas/bfab9cec84f0d1e40e12
Upvotes: 0