Reputation: 923
I'm working with TPL Dataflow now and I need to implement my own action block.
This action block should accept messages from two different input blocks, put these messages into single queue and then process this queue sequentially. The main point here is that two different tasks shouldn't be executed concurrently and I don't want use locks.
Here is my solution but it doesn't work properly.
public class OrderedActionBlock<TInputLhs, TInputRhs> : IDataflowBlock
where TInputLhs : class
where TInputRhs : class
{
public ITargetBlock<TInputLhs> InputLhs { get { return inputLhs; } }
public ITargetBlock<TInputRhs> InputRhs { get { return inputRhs; } }
private readonly BufferBlock<TInputLhs> inputLhs = new BufferBlock<TInputLhs>();
private readonly BufferBlock<TInputRhs> inputRhs = new BufferBlock<TInputRhs>();
private ITargetBlock<object> queue;
public OrderedActionBlock(Action<TInputLhs> actionLhs, Action<TInputRhs> actionRhs)
{
queue = new ActionBlock<object>(x =>
{
if (x is TInputLhs)
{
actionLhs(x as TInputLhs);
}
else
{
actionRhs(x as TInputRhs);
}
});
inputLhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true });
inputRhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true });
}
public void Complete()
{
queue.Complete();
}
public Task Completion
{
get { return queue.Completion; }
}
public void Fault(Exception exception)
{
queue.Fault(exception);
}
}
Simple usage example:
static void Main(string[] args)
{
var splitBlock = new SplitBlock<string>(new Predicate<string>(s => s.Length % 2 == 0));
var batchBlock = new BatchBlock<string>(3);
var processInOrderBlock = new OrderedActionBlock<string, string[]>(
new Action<string>((str) =>
{
Console.WriteLine(str);
}),
new Action<string[]>((batch) =>
{
Console.WriteLine("BATCH - " + string.Join(", ", batch));
}));
splitBlock.SourceFiltered.LinkTo(processInOrderBlock.InputLhs, new DataflowLinkOptions() { PropagateCompletion = true });
splitBlock.SourceNonFiltered.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
batchBlock.LinkTo(processInOrderBlock.InputRhs, new DataflowLinkOptions() { PropagateCompletion = true });
for (int i = 1; i <= 10; i++)
{
splitBlock.Post(new string(Enumerable.Repeat('x', i).ToArray()));
}
splitBlock.Complete();
processInOrderBlock.Completion.Wait();
return;
}
The output:
xx
xxxx
xxxxxx
xxxxxxxx
xxxxxxxxxx
BATCH - x, xxx, xxxxx
Press any key to continue . . .
Looks like messages stuck in batchBlock
. And I don't know why.
Upvotes: 1
Views: 1276
Reputation: 43728
You could have a single ActionBlock
that accepts a ValueTuple
with two values, plus an index to indicate which of the two values is the valid one:
var block = new ActionBlock<(int, Type1, Type2)>(entry =>
{
var (index, item1, item2) = entry;
switch (index)
{
case 1: DoSomething1(item1); break;
case 2: DoSomething2(item2); break;
default: throw new NotImplementedException();
}
});
block.Post((1, someValue1, default));
block.Post((2, default, someValue2));
This way, by getting rid of the two intermediary BufferBlock
s, you could be sure that the order of processing will be exactly the same with the order of posting.
To make it prettier and less error prone you could make a class similar to your OrderedActionBlock
, but with "fake" ITargetBlock<TInputLhs>
and ITargetBlock<TInputRhs>
properties that are not true blocks, but just propagator facades to the single ActionBlock
. Converting from one ITargetBlock
to another is a bit tricky, but it's doable. Below is a generic implementation. The ActionBlock<TInput1, TInput2>
completes when both its Target1
and Target2
are completed, so propagating completion from linked sources should work as expected.
public class ActionBlock<TInput1, TInput2> : IDataflowBlock
{
private readonly ITargetBlock<(int, TInput1, TInput2)> _actionBlock;
public Task Completion => _actionBlock.Completion;
public void Complete() => _actionBlock.Complete();
void IDataflowBlock.Fault(Exception ex) => _actionBlock.Fault(ex);
public ITargetBlock<TInput1> Target1 { get; }
public ITargetBlock<TInput2> Target2 { get; }
public ActionBlock(Func<TInput1, Task> action1, Func<TInput2, Task> action2,
ExecutionDataflowBlockOptions options = null)
{
if (action1 == null) throw new ArgumentNullException(nameof(action1));
if (action2 == null) throw new ArgumentNullException(nameof(action2));
options = options ?? new ExecutionDataflowBlockOptions();
_actionBlock = new ActionBlock<(int, TInput1, TInput2)>(entry =>
{
var (index, item1, item2) = entry;
return index switch // switch expression (C# 8.0 syntax)
{
1 => action1(item1),
2 => action2(item2),
_ => throw new NotImplementedException()
};
}, options);
this.Target1 = new TargetConverter<TInput1, (int, TInput1, TInput2)>(
_actionBlock, x => (1, x, default), () => Complete(1));
this.Target2 = new TargetConverter<TInput2, (int, TInput1, TInput2)>(
_actionBlock, x => (2, default, x), () => Complete(2));
}
// Constructor with synchronous lambdas
public ActionBlock(Action<TInput1> action1, Action<TInput2> action2,
ExecutionDataflowBlockOptions options = null) : this(
item1 => { action1(item1); return Task.CompletedTask; },
item2 => { action2(item2); return Task.CompletedTask; }, options) { }
// Complete when both targets complete
private readonly bool[] _completeState = new bool[2];
private void Complete(int index)
{
bool completed;
lock (_completeState.SyncRoot)
{
_completeState[index - 1] = true;
completed = _completeState.All(v => v);
}
if (completed) _actionBlock.Complete();
}
}
// Generic class for converting from one type of ITargetBlock to another
public class TargetConverter<TFrom, TTo> : ITargetBlock<TFrom>
{
private readonly ITargetBlock<TTo> _parent;
public readonly Func<TFrom, TTo> _convert;
public readonly Action _completeAction;
public TargetConverter(ITargetBlock<TTo> parent, Func<TFrom, TTo> convert,
Action completeAction = null)
{
if (parent == null) throw new ArgumentNullException(nameof(parent));
if (convert == null) throw new ArgumentNullException(nameof(convert));
_parent = parent;
_convert = convert;
_completeAction = completeAction;
}
Task IDataflowBlock.Completion => _parent.Completion;
void IDataflowBlock.Complete()
{
if (_completeAction != null) _completeAction(); else _parent.Complete();
}
void IDataflowBlock.Fault(Exception ex) => _parent.Fault(ex);
DataflowMessageStatus ITargetBlock<TFrom>.OfferMessage(
DataflowMessageHeader messageHeader, TFrom messageValue,
ISourceBlock<TFrom> source, bool consumeToAccept)
{
return _parent.OfferMessage(messageHeader,
_convert(messageValue),
source != null ? new SourceProxy(source, this) : null,
consumeToAccept);
}
// An internal ISourceBlock facade is also needed
private class SourceProxy : ISourceBlock<TTo>
{
private readonly ISourceBlock<TFrom> _source;
private readonly TargetConverter<TFrom, TTo> _target;
public SourceProxy(ISourceBlock<TFrom> source,
TargetConverter<TFrom, TTo> target)
{
_source = source;
_target = target;
}
TTo ISourceBlock<TTo>.ConsumeMessage(
DataflowMessageHeader messageHeader,
ITargetBlock<TTo> target,
out bool messageConsumed)
{
return _target._convert(_source.ConsumeMessage(messageHeader,
_target, out messageConsumed));
}
bool ISourceBlock<TTo>.ReserveMessage(
DataflowMessageHeader messageHeader,
ITargetBlock<TTo> target)
{
return _source.ReserveMessage(messageHeader, _target);
}
void ISourceBlock<TTo>.ReleaseReservation(
DataflowMessageHeader messageHeader,
ITargetBlock<TTo> target)
{
_source.ReleaseReservation(messageHeader, _target);
}
Task IDataflowBlock.Completion => throw new NotSupportedException();
void IDataflowBlock.Complete() => throw new NotSupportedException();
void IDataflowBlock.Fault(Exception exception)
=> throw new NotSupportedException();
IDisposable ISourceBlock<TTo>.LinkTo(
ITargetBlock<TTo> target,
DataflowLinkOptions linkOptions) => throw new NotSupportedException();
}
}
Upvotes: 1
Reputation: 923
Looks like queue
is complete when any of inputLhs
or inputRhs
is complete (if use PropagateCompletion = true
option during linking).
So, we need change this:
inputLhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true });
inputRhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true });
to this:
Task.WhenAll(InputLhs.Completion, InputRhs.Completion)
.ContinueWith(_ => queue.Complete());
Upvotes: 3