Reputation: 1577
I'm using TPL Dataflow to build a pipeline. This pipeline should logically do the following:
pollingBlock
.monitoringBlock
. Each monitoringBlock
can hold only 1 item, but there are multiple monitoringBlocks
.pollingBlock
should keep processing all the items including the one posted in a while (true)
manner.monitoringBlocks
while occupied should not accept any other messages and these messages should just get deleted without further processing.monitoringBlock
the message should either be marked as completed or transfer to the next block for processing, this next block is processingBlock
A brief sample:
public Task ExecutePipeline()
{
var block = CreatePollingPipeline();
block.Post((_serviceOne, _serviceTwo));
block.Complete();
return block.Completion;
}
public ActionBlock<(IServiceOne serviceOne, IServiceTwo serviceTwo)> CreatePollingPipeline()
{
var pollingAlertHolder = new BufferBlock<(string input1, string input2)>();
var pollingBlock = new ActionBlock<(IServiceOne serviceOne, IServiceTwo serviceTwo)>(services =>
{
while (true)
{
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP1", "INPVAL"));
Thread.Sleep(2000);
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP1", "INPVAL"));
Thread.Sleep(2000);
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP2", "INPVAL2"));
Thread.Sleep(2000);
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP1", "INPVAL"));
Thread.Sleep(2000);
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP1", "INPVAL"));
Thread.Sleep(2000);
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP2", "INPVAL2"));
Thread.Sleep(2000);
}
});
var monitoringBlock = new TransformBlock<(string input1, string input2), (string input1, string input2)>(inputs =>
{
Console.WriteLine("monitoringBlock started");
Thread.Sleep(5000);
Console.WriteLine("monitoringBlock completed");
return (inputs.input1, inputs.input2);
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });
pollingAlertHolder.LinkTo(monitoringBlock, new DataflowLinkOptions() { PropagateCompletion = true },
inputs => inputs.input1 == "INP1" && inputs.input2 == "INPVAL");
pollingAlertHolder.LinkTo(DataflowBlock.NullTarget<(string input1, string input2)>());
var processingBlock = new ActionBlock<(string input1, string input2)>(i =>
{
Console.WriteLine("processingBlock started");
Thread.Sleep(2000);
Console.WriteLine("processingBlock completed");
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });
monitoringBlock.LinkTo(processingBlock, new DataflowLinkOptions { PropagateCompletion = true });
return pollingBlock;
}
My question is how do I keep monitoringBlock
occupied until linked processingBlock
finishes its' job? I don't want any items to be posted to monitoringBlock
before the message finished the FULL processing cycle.
Upvotes: 1
Views: 1341
Reputation: 28355
As already mentioned in comments, you can simply encapsulate logic of monitoringBlock
and processingBlock
in one block, for example, you can achieve that via predefined Datablock.Encapsulate
method.
However, if you don't want to do that, you can use AutoResetEvent
or similar abstraction, and your code could be like this:
AutoResetEvent dataflowEvent = new AutoResetEvent(true);
var bufferBlock = new ActionBLock<(string input1, string input2)>(i =>
{
dataflowEvent.WaitOne();
monitoringBlock.Post(i);
});
var monitoringBlock = new TransformBlock<(string input1, string input2), (string input1, string input2)>(inputs =>
{
Console.WriteLine("monitoringBlock started");
Thread.Sleep(5000);
Console.WriteLine("monitoringBlock completed");
dataflowEvent.Set();
return (inputs.input1, inputs.input2);
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });
Upvotes: 1