Reputation: 449
I need to multicast a object into multiple path's
producer
|
multicast
| |
Process1 Process2
| |
Writedb WriteFile
the broadcast block is not helping much, it only does the latest to both proces1, process 2 , if process 2 is running late then it wont be able to receive messages.
db writer and write file have different data.
Here is the following code snippet.
class Program
{
public static void Main()
{
var broadCastBlock = new BroadcastBlock<int>(i => i);
var transformBlock1 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("1 transformblock called: {0}", i);
//Thread.Sleep(4);
return string.Format("1_ {0},", i);
});
var transformBlock2 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("2 transformblock called: {0}", i);
Thread.Sleep(100);
return string.Format("2_ {0},", i);
});
var processorBlockT1 = new ActionBlock<string>(i => Console.WriteLine("processBlockT1 {0}", i));
var processorBlockT2 = new ActionBlock<string>(i => Console.WriteLine("processBlockT2 {0}", i));
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlockT1, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlockT2, new DataflowLinkOptions { PropagateCompletion = true });
const int numElements = 100;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.SendAsync(i);
}
//completion handling
broadCastBlock.Completion.ContinueWith(x =>
{
Console.WriteLine("Braodcast block Completed");
transformBlock1.Complete();
transformBlock2.Complete();
Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ =>
{
processorBlockT1.Complete();
processorBlockT2.Complete();
});
});
transformBlock1.Completion.ContinueWith(x => Console.WriteLine("Transform1 completed"));
transformBlock2.Completion.ContinueWith(x => Console.WriteLine("Transform2 completed"));
processorBlockT1.Completion.ContinueWith(x => Console.WriteLine("processblockT1 completed"));
processorBlockT2.Completion.ContinueWith(x => Console.WriteLine("processblockT2 completed"));
//mark completion
broadCastBlock.Complete();
Task.WhenAll(processorBlockT1.Completion, processorBlockT2.Completion).ContinueWith(_ => Console.WriteLine("completed both tasks")).Wait();
Console.WriteLine("Finished");
Console.ReadLine();
}
}
What is the best method of a guaranteed delivery by broadcast. i.e., a multicast.
should I just stick in two buffers at both ends and then consume it so that the buffers always collect what ever is coming in and then the process might take some time to process all of them?
Upvotes: 2
Views: 471
Reputation: 43555
The BroadcastBlock
guarantees that all messages will be offered to all linked blocks. So it is exactly what you need. What you should fix though is the way you feed the BroadcastBlock
with messages:
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.SendAsync(i); // Don't do this!
}
The SendAsync
method is supposed to be awaited. You should never have more than one pending SendAsync
operations targeting the same block. Doing so not only breaks all guarantees about the order of the received messages, but it is also extremely memory-inefficient. The whole point of using bounded blocks is for controlling the memory usage by limiting the size of the internal buffers of the blocks. By issuing multiple un-awaited SendAsync
commands you circumvent this self-imposed limitation by creating an external dynamic buffer of incomplete Task
s, with each task weighing hundreds of bytes, for propagating messages having just a fraction of this weight. These messages could be much more efficiently buffered internally, by not making the blocks bounded in the first place.
for (int i = 1; i <= numElements; i++)
{
await broadCastBlock.SendAsync(i); // Now it's OK
}
Upvotes: 1