Adrian S
Adrian S

Reputation: 544

BlockingCollection where the consumers are also producers

I have a bunch of requests to process, and during the processing of those requests, more "sub-requests" can be generated and added to the same blocking collection. The consumers add sub-requests to the queue.

It's hard to know when to exit the consuming loop: clearly no thread can call BlockingCollection.CompleteAdding as the other threads may add something to the collection. You also cannot exit the consuming loop just because the BlockingCollection is empty as another thread may have just read the final remaining request from the BlockingCollection and will be about to start generating more requests - the Count of the BlockingCollection will then increase from zero again.

My only idea on this so far is to use a Barrier - when all threads reach the Barrier, there can't be anything left in the BlockingCollection and no thread can be generating new requests. Here is my code - is this an acceptable approach? (and please note: this is highly contrived block of code modelling a much more complex situation: no programmer really writes code that processes random strings 😊 )

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Threading;

namespace Barrier1
{
    class Program
    {
        private static readonly Random random = new Random();
        private static void Main()
        {
            var bc = new BlockingCollection<string>();
            AddRandomStringsToBc(bc, 1000, true);
            int nTasks = 4;
            var barrier = new Barrier(nTasks);
            Action a = () => DoSomething(bc, barrier);
            var actions = Enumerable.Range(0, nTasks).Select(x => a).ToArray();
            Parallel.Invoke(actions);
        }

        private static IEnumerable<char> GetC(bool includeA)
        {
            var startChar = includeA ? 'A' : 'B';
            var add = includeA ? 24 : 25;
            while (true)
            {
                yield return (char)(startChar + random.Next(add));
            }
        }

        private static void DoSomething(BlockingCollection<string> bc, Barrier barrier)
        {
            while (true)
            {
                if (bc.TryTake(out var str))
                {
                    Console.WriteLine(str);
                    if (str[0] == 'A')
                    {
                        Console.WriteLine("Adding more strings...");
                        AddRandomStringsToBc(bc, 100);
                    }
                }
                else
                {
                    // Can't exit the loop here just because there is nothing in the collection.
                    // A different thread may be just about to call AddRandomStringsToBc:
                    if (barrier.SignalAndWait(100))
                    {
                        break;
                    }
                }
            }
        }

        private static void AddRandomStringsToBc(BlockingCollection<string> bc, int n, bool startWithA = false, bool sleep = false)
        {
            var collection = Enumerable.Range(0, n).Select(x => string.Join("", GetC(startWithA).Take(5)));
            foreach (var c in collection)
            {
                bc.Add(c);
            }
        }
    }
}

Upvotes: 1

Views: 965

Answers (1)

Theodor Zoulias
Theodor Zoulias

Reputation: 43384

Here is a collection similar to the BlockingCollection<T>, with the difference that it completes automatically instead of relying on manually calling the CompleteAdding method. The condition for the automatic completion is that the collection is empty, and all the consumers are in a waiting state.

The implementation is based on your clever idea of using a Barrier as a mechanism for checking the auto-complete condition. It's not perfect because it relies on pooling, which is taking place when the collection becomes empty and has some consumers that are still active. On the other hand it allows to exploit all the existing functionality of the BlockingCollection<T> class, instead of rewriting it from scratch:

/// <summary>
/// A blocking collection that completes automatically when it's empty, and all
/// consuming enumerables are in a waiting state.
/// </summary>
public class AutoCompleteBlockingCollection<T> : IEnumerable<T>, IDisposable
{
    private readonly BlockingCollection<T> _queue;
    private readonly Barrier _barrier;
    private volatile bool _autoCompleteStarted;
    private volatile int _intervalMilliseconds = 500;

    public AutoCompleteBlockingCollection(int boundedCapacity = -1)
    {
        _queue = boundedCapacity == -1 ? new() : new(boundedCapacity);
        _barrier = new(0, _ => _queue.CompleteAdding());
    }

    public int Count => _queue.Count;
    public int BoundedCapacity => _queue.BoundedCapacity;
    public bool IsAddingCompleted => _queue.IsAddingCompleted;
    public bool IsCompleted => _queue.IsCompleted;

    /// <summary>
    /// Begin observing the condition for automatic completion.
    /// </summary>
    public void BeginObservingAutoComplete() => _autoCompleteStarted = true;

    /// <summary>
    /// Gets or sets how frequently to check for the auto-complete condition.
    /// </summary>
    public TimeSpan CheckAutoCompleteInterval
    {
        get { return TimeSpan.FromMilliseconds(_intervalMilliseconds); }
        set
        {
            int milliseconds = checked((int)value.TotalMilliseconds);
            if (milliseconds < 0) throw new ArgumentOutOfRangeException();
            _intervalMilliseconds = milliseconds;
        }
    }

    public void Add(T item, CancellationToken cancellationToken = default)
        => _queue.Add(item, cancellationToken);

    public bool TryAdd(T item) => _queue.TryAdd(item);

    public IEnumerable<T> GetConsumingEnumerable(
        CancellationToken cancellationToken = default)
    {
        _barrier.AddParticipant();
        try
        {
            while (true)
            {
                if (!_autoCompleteStarted)
                {
                    if (_queue.TryTake(out var item, _intervalMilliseconds,
                        cancellationToken))
                        yield return item;
                }
                else
                {
                    if (_queue.TryTake(out var item, 0, cancellationToken))
                        yield return item;
                    else if (_barrier.SignalAndWait(_intervalMilliseconds,
                        cancellationToken))
                        break;
                }
            }
        }
        finally { _barrier.RemoveParticipant(); }
    }

    IEnumerator<T> IEnumerable<T>.GetEnumerator()
        => ((IEnumerable<T>)_queue).GetEnumerator();

    IEnumerator IEnumerable.GetEnumerator()
        => ((IEnumerable<T>)_queue).GetEnumerator();

    public void Dispose() { _barrier.Dispose(); _queue.Dispose(); }
}

The BeginObservingAutoComplete method should be called after adding the initial items in the collection. Before calling this method, the auto-complete condition is not checked.

The CheckAutoCompleteInterval is 500 milliseconds by default, and it can be configured at any time.

The Take and TryTake methods are missing on purpose. The collection is intended to be consumed via the GetConsumingEnumerable method. This way the collection keeps track of the currently subscribed consumers, in order to know when to auto-complete. Consumers can be added and removed at any time. A consumer can be removed by exiting the foreach loop, either by break/return etc, or by an exception.

Usage example:

private static void Main()
{
    var bc = new AutoCompleteBlockingCollection<string>();
    AddRandomStringsToBc(bc, 1000, true);
    bc.BeginObservingAutoComplete();
    Action action = () => DoSomething(bc);
    var actions = Enumerable.Repeat(action, 4).ToArray();
    Parallel.Invoke(actions);
}

private static void DoSomething(AutoCompleteBlockingCollection<string> bc)
{
    foreach (var str in bc.GetConsumingEnumerable())
    {
        Console.WriteLine(str);
        if (str[0] == 'A')
        {
            Console.WriteLine("Adding more strings...");
            AddRandomStringsToBc(bc, 100);
        }
    }
}

The collection is thread-safe, with the exception of the Dispose method.

Upvotes: 4

Related Questions