Reputation: 504
I have two TransformBlocks which are arranged in a loop. They link their data to each other. TransformBlock 1 is an I/O block reading data and is limited to a maximum of 50 tasks. It reads the data and some meta data. Then they are passed to the second block. The second block decides on the meta data if the message goes again to the first block. So after the meta data matches the criteria and a short wait the data should go again back again to the I/O block. The second blocks MaxDegreeOfParallelism can be unlimited.
Now I have noticed when I send a lot of data to the I/O block it takes a long time till the messages are linked to the second block. It takes like 10 minutes to link the data and they are all sent in a bunch. Like 1000 entries in a few seconds. Normally I would implement it like so:
public void Start()
{
_ioBlock = new TransformBlock<Data,Tuple<Data, MetaData>>(async data =>
{
var metaData = await ReadAsync(data).ConfigureAwait(false);
return new Tuple<Data, MetaData>(data, metaData);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 });
_waitBlock = new TransformBlock<Tuple<Data, MetaData>,Data>(async dataMetaData =>
{
var data = dataMetaData.Item1;
var metaData = dataMetaData.Item2;
if (!metaData.Repost)
{
return null;
}
await Task.Delay(TimeSpan.FromMinutes(1)).ConfigureAwait(false);
return data;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
_ioBlock.LinkTo(_waitBlock);
_waitBlock.LinkTo(_ioBlock, data => data != null);
_waitBlock.LinkTo(DataflowBlock.NullTarget<Data>());
foreach (var data in Enumerable.Range(0, 2000).Select(i => new Data(i)))
{
_ioBlock.Post(data);
}
}
But because of the described problem I have to implement it like so:
public void Start()
{
_ioBlock = new ActionBlock<Data>(async data =>
{
var metaData = await ReadAsync(data).ConfigureAwait(false);
var dataMetaData= new Tuple<Data, MetaData>(data, metaData);
_waitBlock.Post(dataMetaData);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 });
_waitBlock = new ActionBlock<Tuple<Data, MetaData>>(async dataMetaData =>
{
var data = dataMetaData.Item1;
var metaData = dataMetaData.Item2;
if (metaData.Repost)
{
await Task.Delay(TimeSpan.FromMinutes(1)).ConfigureAwait(false);
_ioBlock.Post(data);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
foreach (var data in Enumerable.Range(0, 2000).Select(i => new Data(i)))
{
_ioBlock.Post(data);
}
}
When I use the second approach the data get linked/posted faster (one by one). But it feels more like a hack to me. Anybody know how to fix the problem? Some friends recommended me to use TPL Pipeline but it seems much more complicated to me.
Upvotes: 2
Views: 1100
Reputation: 504
Problem solved. You need to set
ExecutionDataflowBlockOptions.EnsureOrdered
to forward the data immediately to the next/wait block.
Further information:
Why do blocks run in this order?
Upvotes: 2