avo
avo

Reputation: 10711

Is there anything like asynchronous BlockingCollection<T>?

I would like to await on the result of BlockingCollection<T>.Take() asynchronously, so I do not block the thread. Looking for anything like this:

var item = await blockingCollection.TakeAsync();

I know I could do this:

var item = await Task.Run(() => blockingCollection.Take());

but that kinda kills the whole idea, because another thread (of ThreadPool) gets blocked instead.

Is there any alternative?

Upvotes: 115

Views: 33266

Answers (4)

Theodor Zoulias
Theodor Zoulias

Reputation: 43555

The asynchronous (non-blocking) alternative of the BlockingCollection<T> is the Channel<T> class. It offers almost the same functionality, plus some extra features. You can instantiate a Channel<T> using the Channel's static factory methods, as shown below (demonstrating the default values of all available options).

Channel<Item> channel = Channel.CreateUnbounded<Item>(new UnboundedChannelOptions()
{
    SingleWriter = false,
    SingleReader = false,
    AllowSynchronousContinuations = false,
});
Channel<Item> channel = Channel.CreateBounded<Item>(new BoundedChannelOptions(capacity)
{
    SingleWriter = false,
    SingleReader = false,
    AllowSynchronousContinuations = false,
    FullMode = BoundedChannelFullMode.Wait,
});

The most striking difference is that the Channel<T> exposes a Writer and a Reader facade. So you can pass the Writer facade to a method that plays the role of the producer, and similarly the Reader facade to a method that plays the role of the consumer. The Writer is only allowed to add items in the channel, and mark it as completed. The Reader is only allowed to take items from the channel, and await its completion. Both facades expose only non-blocking APIs. For example the ChannelWriter<T> has a WriteAsync method that returns a ValueTask. If you have some reason to block on these APIs, for example if one worker of your producer/consumer pair has to be synchronous, then you can block with .AsTask().GetAwaiter().GetResult(), but this will not be as efficient as using a BlockingCollection<T>. If you want to learn more about the similarities and differences between the Channel<T> and BlockingCollection<T> classes, take a look at this answer.

An implementation of a custom AsyncBlockingCollection<T> class, having only the most basic features, can be found in the 3rd revision of this answer.

Upvotes: 11

mightypanda
mightypanda

Reputation: 1

This is super-simple, but it serves my needs.

    public static class BlockingCollectionEx
    {
        public async static Task<T> TakeAsync<T>(this BlockingCollection<T> bc, CancellationToken token, int inner_delay = 10)
        {
            while (!token.IsCancellationRequested)
            {
                if (bc.TryTake(out T el))
                    return el;
                else
                    await Task.Delay(inner_delay);
            }

            throw new OperationCanceledException();
        }
    }

Upvotes: -1

Stephen Cleary
Stephen Cleary

Reputation: 456637

There are four alternatives that I know of.

The first is Channels, which provides a threadsafe queue that supports asynchronous Read and Write operations. Channels are highly optimized and optionally support dropping some items if a threshold is reached.

The next is BufferBlock<T> from TPL Dataflow. If you only have a single consumer, you can use OutputAvailableAsync or ReceiveAsync, or just link it to an ActionBlock<T>. For more information, see my blog.

The last two are types that I created, available in my AsyncEx library.

AsyncCollection<T> is the async near-equivalent of BlockingCollection<T>, capable of wrapping a concurrent producer/consumer collection such as ConcurrentQueue<T> or ConcurrentBag<T>. You can use TakeAsync to asynchronously consume items from the collection. For more information, see my blog.

AsyncProducerConsumerQueue<T> is a more portable async-compatible producer/consumer queue. You can use DequeueAsync to asynchronously consume items from the queue. For more information, see my blog.

The last three of these alternatives allow synchronous and asynchronous puts and takes.

Upvotes: 137

John Leidegren
John Leidegren

Reputation: 61007

...or you can do this:

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class AsyncQueue<T>
{
    private readonly SemaphoreSlim _sem;
    private readonly ConcurrentQueue<T> _que;

    public AsyncQueue()
    {
        _sem = new SemaphoreSlim(0);
        _que = new ConcurrentQueue<T>();
    }

    public void Enqueue(T item)
    {
        _que.Enqueue(item);
        _sem.Release();
    }

    public void EnqueueRange(IEnumerable<T> source)
    {
        var n = 0;
        foreach (var item in source)
        {
            _que.Enqueue(item);
            n++;
        }
        _sem.Release(n);
    }

    public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
    {
        for (; ; )
        {
            await _sem.WaitAsync(cancellationToken);

            T item;
            if (_que.TryDequeue(out item))
            {
                return item;
            }
        }
    }
}

Simple, fully functional asynchronous FIFO queue.

Note: SemaphoreSlim.WaitAsync was added in .NET 4.5 before that, this was not all that straightforward.

Upvotes: 27

Related Questions