Reputation: 4773
This issue is really hard to debug, not always happens (not happen in a short time so that I can just debug the code easily) and looks like no one out there has had the similar issue like this? (I've googled for hours without finding anything related to this issue).
In a short word, my dataflow network works fine at some point until I find out that the terminal block (which updates the UI) seems to stop working (no new data updated on the UI) whereas all the upwards dataflow blocks are still working fine. So it's like there is some disconnection between the other blocks and the ui block here.
Here is my detailed dataflow network, let's check out first before I'm going to explain more about the issue:
//the network graph first
[raw data block]
-> [switching block] -> [data counting block]
-> [processing block] -> [ok result block] -> [completion monitoring]
-> [not ok result block] -> [completion monitoring]
//in the UI code behind where I can consume the network and plug-in some other blocks for updating
//like this:
[ok result block] -> [ok result counting block]
[not ok result block] -> [other ui updating]
The block [ok result block]
is a BroadcastBlock
which pushes result to the [ok result counting block]
. The issue I've described partly here is this [ok result counting block]
seems to be disconnected from [ok result block]
.
var options = new DataflowBlockOptions { EnsureOrdered = false };
var execOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 80 };
//[raw data block]
var rawDataBlock = new BufferBlock<Input>(options);
//[switching block]
var switchingBlock = new TransformManyBlock<Input,Input>(e => new[] {e,null});
//[data counting block]
var dataCountingBlock = new BroadcastBlock<Input>(null);
//[processing block]
var processingBlock = new TransformBlock<Input,int>(async e => {
//call another api to compute the result
var result = await …;
//rollback the input for later processing (some kind of retry)
if(result < 0){
//per my logging, there is only one call dropping
//in this case
Task.Run(rollback);
}
//local function to rollback
async Task rollback(){
await rawDataBlock.SendAsync(e).ConfigureAwait(false);
}
return result;
}, execOptions);
//[ok result block]
var okResultBlock = new BroadcastBlock<int>(null, options);
//[not ok result block]
var notOkResultBlock = new BroadcastBlock<int>(null, options);
//[completion monitoring]
var completionMonitoringBlock = new ActionBlock<int>(e => {
if(rawDataBlock.Completion.IsCompleted && processingBlock.InputCount == 0){
processingBlock.Complete();
}
}, execOptions);
//connect the blocks to build the network
rawDataBlock.LinkTo(switchingBlock);
switchingBlock.LinkTo(processingBlock, e => e != null);
switchingBlock.LinkTo(dataCountingBlock, e => e == null);
processingBlock.LinkTo(okResultBlock, e => e >= 9);
processingBlock.LinkTo(notOkResultBlock, e => e < 9);
okResultBlock.LinkTo(completionMonitoringBlock);
notOkResultBlock.LinkTo(completionMonitoringBlock);
In the UI code behind, I plug in some other UI blocks to update the info. Here I'm using WPF
but I think it does not matter here:
var uiBlockOptions = new ExecutionDataflowBlockOptions {
TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
};
dataCountingBlock.LinkTo(new ActionBlock<int>(e => {
//these are properties in the VM class, which is bound to the UI (xaml view)
RawInputCount++;
}, uiBlockOptions));
okResultBlock.LinkTo(new ActionBlock<int>(e => {
//these are properties in the VM class, which is bound to the UI (xaml view)
ProcessedCount++;
OkResultCount++;
}, uiBlockOptions));
notOkResultBlock.LinkTo(new ActionBlock<int>(e => {
//these are properties in the VM class, which is bound to the UI (xaml view)
ProcessedCount++;
PendingCount = processingBlock.InputCount;
}, uiBlockOptions));
I do have code monitoring the completion status of the blocks: rawDataBlock
, processingBlock
, okResultBlock
, notOkResultBlock
.
I also have other logging code inside the processingBlock
to help diagnosing.
So as I said, after some fairly long time (about 1 hour with about 600K items processed, actually this number says nothing about the issue, it could be random), the network seems to still run fine except that some counts (ok result, not ok result) are not updated, as if the okResultBlock
and notOkResultBlock
were disconnected from the processingBlock
OR they were disconnected from the UI blocks (which updates the UI). I ensure that the processingBlock
is still working (no exception logged and the results are still written to file), the dataCountingBlock
is still working well (with new count updated on the UI), all the blocks processingBlock
, okResultBlock
, notOkResultBlock
are not completed (their completions are .ContinueWith
a task which logs out the status and nothing logged).
So it's really stuck there. I don't have any clue about why it could stop working like that. This could only happen when we use a black-box library like TPL Dataflow
. I know it may also be hard for you to diagnose, imagine and think about possibilities. I just asked here for suggestions to solve this as well as any shared experience (about the similar issues) from you and possibly some guesses about what could cause such kind of issue in TPL Dataflow
UPDATE:
I've successfully reproduced the bug one more time and before I had prepared some code to write down some info to help debugging. The issue now keeps down to this point: The processingBlock
somehow does not actually push/post/send any msg to all the linked blocks (including the okResultBlock
and notOkResultBlock
) AND even a new block (prepended with DataflowLinkOptions
having Append
of false) linked to it could not receive any message (the result). As I said the processBlock
does seem to still work fine (its Action
does run the code inside and produce result logging normally). So this is still a very strange issue.
In a short word, the problem now becomes why the processBlock
could not send/post its messages to all the other linked blocks? Is there any possible cause for that to occur? How to know if the blocks are linked successfully (after the call to .LinkTo
)?
Upvotes: 0
Views: 115
Reputation: 4773
It's actually my fault, the processingBlock
is actually blocked but it's blocked correctly and in a good way (by design).
The processingBlock
is blocked by 2 factors:
EnsureOrdered
is true
(as by default), so the output is always queued in the processed order.So if one output result cannot be pushed out, it will be a blocking item because of all the output results being queued in the processed order. All the after processed output results will simply be blocked (queued up) by the first output result that cannot be pushed out.
In my case the special output result that cannot be pushed out here is a null
result. That null result can only be produced by some error (exception handling). So I have 2 blocks okResultBlock
and notOkResultBlock
linked to the processingBlock
. But both those blocks are filtered to let only non-null results go through. Sorry that my question does not reflect the exact code I have, about the output type. In the question it is just a simple int
but actually it's a class (nullable), the actual linking code looks like this:
processingBlock.LinkTo(okResultBlock, e => e != null && e.Point >= 9);
processingBlock.LinkTo(notOkResultBlock, e => e != null && e.Point < 9);
So the null
output result will be blocked and consequentially block all the after processed result (because of the option EnsureOrdered
being true
by default).
To fix this, I just simply set the EnsureOrdered
to false
(although this is not required to avoid the blocking, but it's good in my case) and add one more block to consume the null
output result (this is the most important to help avoid blocking):
processingBlock.LinkTo(DataflowBlock.NullTarget<Output>(), e => e == null);
Upvotes: 1