Reputation: 4478
I have a set of simple blocks which are mostly processed in a serial manner but I have two blocks which I want to process in parallel (processblock1 & processblock2). I just started playing around with TPL datablocks so new to it. However in the code below, I can see paraellelblock1 is being called as but never parallelblock2 as expected. I was hoping they would both be kicked off in parallel.
class Program
{
static void Main(string[] args)
{
var readBlock = new TransformBlock<int, int>(x => DoSomething(x, "readBlock"),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }); //1
var processBlock1 =
new TransformBlock<int, int>(x => DoSomething(x, "processBlock1")); //2
var processBlock2 =
new TransformBlock<int, int>(x => DoSomething(x, "processBlock2")); //3
var saveBlock =
new ActionBlock<int>(
x => Save(x)); //4
readBlock.LinkTo(processBlock1,
new DataflowLinkOptions { PropagateCompletion = true }); //5
readBlock.LinkTo(processBlock2,
new DataflowLinkOptions { PropagateCompletion = true }); //6
processBlock1.LinkTo(
saveBlock); //7
processBlock2.LinkTo(
saveBlock); //8
readBlock.Post(1); //10
Task.WhenAll(
processBlock1.Completion,
processBlock2.Completion)
.ContinueWith(_ => saveBlock.Complete()); //11
readBlock.Complete(); //12
saveBlock.Completion.Wait(); //13
Console.WriteLine("Processing complete!");
Console.ReadLine();
}
private static int DoSomething(int i, string method)
{
Console.WriteLine($"Do Something, callng method : { method}");
return i;
}
private static async Task<int> DoSomethingAsync(int i, string method)
{
DoSomething(i, method);
return i;
}
private static void Save(int i)
{
Console.WriteLine("Save!");
}
}
Upvotes: 0
Views: 127
Reputation: 2767
It appears that you're posting only one item to the graph, and the first consumer to consume it wins. There's no implied 'tee' functionality in the graph you've made--so there's no possible parallelism there.
Upvotes: 0
Reputation: 4302
By default tpl block will only send a message to the first linked block. Use a BroadcastBlock to send a message to many components.
void Main()
{
var random = new Random();
var readBlock = new TransformBlock<int, int>(x => { return DoSomething(x, "readBlock"); },
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }); //1
var braodcastBlock = new BroadcastBlock<int>(i => i); // ⬅️ Here
var processBlock1 =
new TransformBlock<int, int>(x => DoSomething(x, "processBlock1")); //2
var processBlock2 =
new TransformBlock<int, int>(x => DoSomething(x, "processBlock2")); //3
var saveBlock =
new ActionBlock<int>(
x => Save(x)); //4
readBlock.LinkTo(braodcastBlock, new DataflowLinkOptions { PropagateCompletion = true });
braodcastBlock.LinkTo(processBlock1,
new DataflowLinkOptions { PropagateCompletion = true }); //5
braodcastBlock.LinkTo(processBlock2,
new DataflowLinkOptions { PropagateCompletion = true }); //6
processBlock1.LinkTo(
saveBlock); //7
processBlock2.LinkTo(
saveBlock); //8
readBlock.Post(1); //10
readBlock.Post(2); //10
Task.WhenAll(
processBlock1.Completion,
processBlock2.Completion)
.ContinueWith(_ => saveBlock.Complete());
readBlock.Complete(); //12
saveBlock.Completion.Wait(); //13
Console.WriteLine("Processing complete!");
}
// Define other methods and classes here
private static int DoSomething(int i, string method)
{
Console.WriteLine($"Do Something, callng method : { method} {i}");
return i;
}
private static Task<int> DoSomethingAsync(int i, string method)
{
DoSomething(i, method);
return Task.FromResult(i);
}
private static void Save(int i)
{
Console.WriteLine("Save! " + i);
}
Upvotes: 2