Jeff
Jeff

Reputation: 36573

TPL DataFlow Queue with Postponement

I am processing a queue concurrently using an ActionBlock.

The one catch here is that when processing an item in the queue, I may want to wait until a dependency is satisfied by the processing of another item in the queue.

I think I should be able to do this with the TPL DataFlow library with linking, postponement and release of postponement but I'm not sure what constructs to use.

In pseudocode:

public class Item 
{
    public string Name { get; set; }
    public List<string> DependsOn = new List<string>();
}

ActionBlock<Item> block = null;
var block = new ActionBlock<Item>(o => {
    if (!HasActionBlockProcessedAllDependencies(o.DependsOn)) 
    {
       // enqueue a callback when ALL dependencies have been completed
    } 
    else 
    {
        DoWork(o);
    }
},
new ExecutionDataflowBlockOptions { 
    MaxDegreeOfParallelism = resourceProcessorOptions.MaximumProviderConcurrency
});

var items = new[] 
{
    new Item { Name = "Apple", DependsOn = { "Pear" } },
    new Item { Name = "Pear" }
}

Upvotes: 1

Views: 560

Answers (1)

Theodor Zoulias
Theodor Zoulias

Reputation: 43495

I am not sure if this will be helpful to you, but here is a custom DependencyTransformBlock class that knows about the dependencies between the items it receives, and processes each one only after its dependencies have been successfully processed. This custom block supports all the built-in functionality of a normal TransformBlock, except from the EnsureOrdered option.

The constructors of this class accept a Func<TInput, TKey> lambda for retrieving the key of each item, and a Func<TInput, IReadOnlyCollection<TKey>> lambda for retrieving its dependencies. The keys are expected to be unique. In case a duplicate key is found, the block will complete with failure.

In case of circular dependencies between items, the affected items will remain unprocessed. The property TInput[] Unprocessed allows to retrieve the unprocessed items after the completion of the block. An item can also remain unprocessed in case any of its dependencies is not supplied.

public class DependencyTransformBlock<TInput, TKey, TOutput> :
    ITargetBlock<TInput>, ISourceBlock<TOutput>
{
    private readonly ITargetBlock<TInput> _inputBlock;
    private readonly IPropagatorBlock<Item, TOutput> _transformBlock;

    private readonly object _locker = new object();
    private readonly Dictionary<TKey, Item> _items;

    private int _pendingCount = 1;
    // The initial 1 represents the completion of the _inputBlock

    private class Item
    {
        public TKey Key;
        public TInput Input;
        public bool HasInput;
        public bool IsCompleted;
        public HashSet<Item> Dependencies;
        public HashSet<Item> Dependents;

        public Item(TKey key) => Key = key;
    }

    public DependencyTransformBlock(
        Func<TInput, Task<TOutput>> transform,
        Func<TInput, TKey> keySelector,
        Func<TInput, IReadOnlyCollection<TKey>> dependenciesSelector,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null,
        IEqualityComparer<TKey> keyComparer = null)
    {
        if (transform == null)
            throw new ArgumentNullException(nameof(transform));
        if (keySelector == null)
            throw new ArgumentNullException(nameof(keySelector));
        if (dependenciesSelector == null)
            throw new ArgumentNullException(nameof(dependenciesSelector));

        dataflowBlockOptions =
            dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();
        keyComparer = keyComparer ?? EqualityComparer<TKey>.Default;

        _items = new Dictionary<TKey, Item>(keyComparer);

        _inputBlock = new ActionBlock<TInput>(async input =>
        {
            var key = keySelector(input);
            var dependencyKeys = dependenciesSelector(input);
            bool isReadyForProcessing = true;
            Item item;
            lock (_locker)
            {
                if (!_items.TryGetValue(key, out item))
                {
                    item = new Item(key);
                    _items.Add(key, item);
                }
                if (item.HasInput)
                    throw new InvalidOperationException($"Duplicate key ({key}).");
                item.Input = input;
                item.HasInput = true;

                if (dependencyKeys != null && dependencyKeys.Count > 0)
                {
                    item.Dependencies = new HashSet<Item>();
                    foreach (var dependencyKey in dependencyKeys)
                    {
                        if (!_items.TryGetValue(dependencyKey, out var dependency))
                        {
                            dependency = new Item(dependencyKey);
                            _items.Add(dependencyKey, dependency);
                        }
                        if (!dependency.IsCompleted)
                        {
                            item.Dependencies.Add(dependency);
                            if (dependency.Dependents == null)
                                dependency.Dependents = new HashSet<Item>();
                            dependency.Dependents.Add(item);
                        }
                    }
                    isReadyForProcessing = item.Dependencies.Count == 0;
                }
                if (isReadyForProcessing) _pendingCount++;
            }
            if (isReadyForProcessing)
            {
                await _transformBlock.SendAsync(item);
            }
        }, new ExecutionDataflowBlockOptions()
        {
            CancellationToken = dataflowBlockOptions.CancellationToken,
            BoundedCapacity = 1
        });

        var middleBuffer = new BufferBlock<Item>(new DataflowBlockOptions()
        {
            CancellationToken = dataflowBlockOptions.CancellationToken,
            BoundedCapacity = DataflowBlockOptions.Unbounded
        });

        _transformBlock = new TransformBlock<Item, TOutput>(async item =>
        {
            try
            {
                TInput input;
                lock (_locker)
                {
                    Debug.Assert(item.HasInput && !item.IsCompleted);
                    input = item.Input;
                }
                var result = await transform(input).ConfigureAwait(false);
                lock (_locker)
                {
                    item.IsCompleted = true;
                    if (item.Dependents != null)
                    {
                        foreach (var dependent in item.Dependents)
                        {
                            Debug.Assert(dependent.Dependencies != null);
                            var removed = dependent.Dependencies.Remove(item);
                            Debug.Assert(removed);
                            if (dependent.HasInput
                                && dependent.Dependencies.Count == 0)
                            {
                                middleBuffer.Post(dependent);
                                _pendingCount++;
                            }
                        }
                    }
                    item.Input = default; // Cleanup
                    item.Dependencies = null;
                    item.Dependents = null;
                }
                return result;
            }
            finally
            {
                lock (_locker)
                {
                    _pendingCount--;
                    if (_pendingCount == 0) middleBuffer.Complete();
                }
            }
        }, dataflowBlockOptions);

        middleBuffer.LinkTo(_transformBlock);

        PropagateCompletion(_inputBlock, middleBuffer,
            condition: () => { lock (_locker) return --_pendingCount == 0; });
        PropagateCompletion(middleBuffer, _transformBlock);
        PropagateFailure(_transformBlock, middleBuffer);
        PropagateFailure(_transformBlock, _inputBlock);
    }

    // Constructor with synchronous lambda
    public DependencyTransformBlock(
        Func<TInput, TOutput> transform,
        Func<TInput, TKey> keySelector,
        Func<TInput, IReadOnlyCollection<TKey>> dependenciesSelector,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null,
        IEqualityComparer<TKey> keyComparer = null) : this(
            input => Task.FromResult(transform(input)),
            keySelector, dependenciesSelector, dataflowBlockOptions, keyComparer)
    {
        if (transform == null) throw new ArgumentNullException(nameof(transform));
    }

    public TInput[] Unprocessed
    {
        get
        {
            lock (_locker) return _items.Values
                .Where(item => item.HasInput && !item.IsCompleted)
                .Select(item => item.Input)
                .ToArray();
        }
    }

    public Task Completion => _transformBlock.Completion;
    public void Complete() => _inputBlock.Complete();
    void IDataflowBlock.Fault(Exception ex) => _inputBlock.Fault(ex);

    DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(
        DataflowMessageHeader header, TInput value, ISourceBlock<TInput> source,
        bool consumeToAccept)
    {
        return _inputBlock.OfferMessage(header, value, source, consumeToAccept);
    }

    TOutput ISourceBlock<TOutput>.ConsumeMessage(DataflowMessageHeader header,
        ITargetBlock<TOutput> target, out bool messageConsumed)
    {
        return _transformBlock.ConsumeMessage(header, target, out messageConsumed);
    }

    bool ISourceBlock<TOutput>.ReserveMessage(DataflowMessageHeader header,
        ITargetBlock<TOutput> target)
    {
        return _transformBlock.ReserveMessage(header, target);
    }

    void ISourceBlock<TOutput>.ReleaseReservation(DataflowMessageHeader header,
        ITargetBlock<TOutput> target)
    {
        _transformBlock.ReleaseReservation(header, target);
    }

    public IDisposable LinkTo(ITargetBlock<TOutput> target,
        DataflowLinkOptions linkOptions)
    {
        return _transformBlock.LinkTo(target, linkOptions);
    }

    private async void PropagateCompletion(IDataflowBlock source,
        IDataflowBlock target, Func<bool> condition = null)
    {
        try { await source.Completion.ConfigureAwait(false); } catch { }
        if (source.Completion.IsFaulted)
            target.Fault(source.Completion.Exception.InnerException);
        else
            if (condition == null || condition()) target.Complete();
    }

    private async void PropagateFailure(IDataflowBlock source,
        IDataflowBlock target)
    {
        try { await source.Completion.ConfigureAwait(false); } catch { }
        if (source.Completion.IsFaulted)
            target.Fault(source.Completion.Exception.InnerException);
    }
}

Usage example:

var block = new DependencyTransformBlock<Item, string, Item>(item =>
{
    DoWork(item);
    return item;
},
keySelector: item => item.Name,
dependenciesSelector: item => item.DependsOn,
new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
},
keyComparer: StringComparer.OrdinalIgnoreCase);

//...

block.LinkTo(DataflowBlock.NullTarget<Item>());

In this example the block is linked to a NullTarget in order to discard its output, so that it becomes essentially an ActionBlock equivalent.

Upvotes: 1

Related Questions