Theodor Zoulias
Theodor Zoulias

Reputation: 43545

Producer consumer using a bounded Channel<T>, with strict memory allocation requirements

Here is my scenario¹. I have a producer consumer system that is composed by two producers, one consumer, and a bounded Channel<T> configured with capacity 2. The T is byte[].

Channel<byte[]> channel = Channel.CreateBounded<byte[]>(2);

The byte[]s propagated through the channel are huge (1GB each), which creates the need to limit the total number of the arrays that exist at any given moment to a minimum. So the two producers are waiting before creating a new byte[1_000_000_000], until they know that there is empty space in the channel. Here is the first producer:

Task producer1 = Task.Run(async () =>
{
    while (true)
    {
        await channel.Writer.WaitToWriteAsync();

        // The channel has space available. Let's create the array.
        byte[] array = new byte[1_000_000_000];

        // Here initialize the array (mainly I/O bound, time consuming)

        // At this moment the channel might be full,
        // because the other producer filled the gap.
        await channel.Writer.WriteAsync(array);
    }
});

The second producer is identical. Unfortunately this allows both producers to start creating a new array, even when there is only one empty slot in the channel. So at some moment the system might have 4 huge arrays alive at the same time: 1 consumed by the consumer, 1 stored in the channel, and 2 created concurrently be the two producers (trying to fill a single empty slot).

I want to limit the total number of arrays in managed memory to 3. Is there any way that I can tame my producers, so that they don't start creating a new byte[] until there is certainly space available for it in the channel? In other words after creating an array, the producer should be able to write it immediately in the channel like this:

bool success = channel.Writer.TryWrite(array);

...and the success should always be true.

¹ This scenario is contrived. It was inspired by a recent GitHub issue.


Clarification: The construction and initialization of the byte arrays is the exclusive responsibility of the producer, and it should stay this way. Delegating the construction work elsewhere, either partially or fully, is not desirable.

Upvotes: 1

Views: 1294

Answers (4)

Stephen Cleary
Stephen Cleary

Reputation: 456587

The construction and initialization of the byte arrays is the exclusive responsibility of the producer, and it should stay this way. Delegating the construction work elsewhere, either partially or fully, is not desirable.

In that case, you can use the limiter to limit "tokens", if you will, where each token is an authorization to allocate.

Note the Disposable type is from my package.

public sealed class TokenAllocator
{
  private readonly SemaphoreSlim _mutex;

  public TokenAllocator(int maxTokens) =>
      _mutex = new(maxTokens);

  public async Task<IDisposable> AllocateAsync()
  {
    await _mutex.WaitAsync();
    return Disposable.Create(() => _mutex.Release());
  }
}

Usage:

var allocator = new TokenAllocator(3);
var channel = Channel.CreateBounded<(IDisposable token, byte[] item)>(2);

var consumer = Task.Run(async () =>
{
  await foreach (var (token, item) in channel.Reader.ReadAllAsync())
  using (token)
  {
    ... // Do something with `item`
  }
});

var producer1 = Task.Run(async () =>
{
  while (true)
  {
    var token = await allocator.AllocateAsync();
    try
    {
      var item = new byte[1_000_000_000];
      ... // Do something with `item`
    }
    catch
    {
      token.Dispose();
      throw;
    }
    await channel.Writer.WriteAsync((token, item));
  }
});

Upvotes: 2

Theodor Zoulias
Theodor Zoulias

Reputation: 43545

Here is another approach, which also uses a SemaphoreSlim like in Stephen Cleary's and to11mtm's answers. The difference is that the initialCount and maxCount of the semaphore are initialized to the exact capacity of the channel, and the semaphore is released by the consumer immediately after a byte[] is taken from the channel, not when it's fully processed:

const int capacity = 2;
Channel<byte[]> channel = Channel.CreateBounded<byte[]>(capacity);
SemaphoreSlim semaphore = new(capacity, capacity);

Task producer1 = Task.Run(async () =>
{
    while (true)
    {
        await semaphore.WaitAsync();
        try
        {
            byte[] array = new byte[1_000_000_000];
            // Here initialize the array...
            await channel.Writer.WriteAsync(array);
        }
        catch
        {
            semaphore.Release();
            throw;
        }
    }
});

Task consumer = Task.Run(async () =>
{
    await foreach (byte[] array in channel.Reader.ReadAllAsync())
    {
        semaphore.Release();
        // Here consume the array...
    }
});

Essentially the SemaphoreSlim becomes the guard that limits the capacity of the channel. You might as well use an unbounded channel if you want.

This approach ensures that the maximum number of byte[]s allocated at any time will be 3, excluding those that are eligible for garbage collection.

Upvotes: 0

Stephen Cleary
Stephen Cleary

Reputation: 456587

Buffered producer/consumer systems like channels and Dataflow are a little "fuzzy" around maximum buffer sizes (I can never remember if Dataflow counts items in output buffers or not). And as you point out, they don't count any items held by producers or consumers.

So, in order to restrict the total number of objects at any time, you'll need your own allocator.

public sealed class LimitedAllocator<T>
{
  private readonly SemaphoreSlim _mutex;

  public LimitedAllocator(int maxItems) =>
      _mutex = new(maxItems);

  public async Task<AllocatedItem> AllocateAsync(Func<T> create)
  {
    await _mutex.WaitAsync();
    return new(this, create());
  }

  private void Free() => _mutex.Release();

  public sealed class AllocatedItem : IDisposable
  {
    public AllocatedItem(LimitedAllocator<T> allocator, T item)
    {
      Item = item;
      _disposer = Disposable.Create(() => allocator.Free());
    }
    public T Item { get; }
    public void Dispose() => _disposer.Dispose();
    private readonly IDisposable _disposer;
  }
}

Usage:

var allocator = new LimitedAllocator<byte[]>(3);
var channel = Channel.CreateBounded<LimitedAllocator<byte[]>.AllocatedItem>(2);

var consumer = Task.Run(async () =>
{
  await foreach (var allocatedItem in channel.Reader.ReadAllAsync())
  using (allocatedItem)
  {
    ... // Do something with allocatedItem.Item
  }
});

var producer1 = Task.Run(async () =>
{
  while (true)
  {
    var allocatedItem = await allocator.AllocateAsync(() => new byte[1_000_000_000]);
    ... // Do something with allocatedItem.Item
    await channel.Writer.WriteAsync(allocatedItem);
  }
});

Notes:

  • The producer example assumes exceptions will cause an app failure. If exceptions must be recovered from, then the ... in the producer needs a try/catch around it that disposes the allocated item only in the case of exceptions.
  • In the case where the T in LimitedAllocator<T> is just memory (e.g., byte[]), you can consider using an IMemoryOwner instead of AllocatedItem. IMemoryOwner is essentially a disposable combined with memory.
  • The producer no longer waits to see if there is room available in the channel; it just waits to see if there's room available in the allocator. If there's room available in the allocator, then that producer becomes the one that will create an item to send into the channel.
  • If you strongly dislike the AllocatedItem pairing of IDisposable-with-item, then it's possible to use connected properties. But that tends towards more magic and less maintainable code.

Upvotes: 2

to11mtm
to11mtm

Reputation: 168

One option, although it requires a little manual management, would be a simple wrapper around a channel that limits the size.

public class LockingFixedPool<T>
{
    private readonly Channel<T> Items;
    private readonly SemaphoreSlim locker;
    
    public LockingFixedPool(T[] seeded)
    {
        Items = Channel.CreateBounded<T>(seeded.Length);
        foreach (var element in seeded)
        {
            Items.Writer.TryWrite(element);
        }
    }
    public async Task<T> GetItemAsync()
    {
        return await Items.Reader.ReadAsync();
    }
    public void Release(T item)
    {
        //unless you make this specific to arrays,
        //you should clear before calling release.
        Items.Writer.TryWrite(item);
    }
}

The biggest problem with this 'bare' implementation, is that you have to manually manage when the item is released. As an alternative, you could return a 'wrapped' instance:

public class LockPooledItem<T> : IDisposable
{
    public T Item {get;}
    private LockingFixedPool<T> _pool;
    public LockPooledItem(T item, LockingFixedPool<T> pool)
    {
        Item = item;
        _pool = pool;
    }
    public void Dispose()
    {
        _pool.Release(Item);
    }
}

And have LockingFixedPool return instances of those. OFC you'll then be allocating the tracking object, -but- it is a trade off between composition and manual code around managing the allocation.

Edit per comment:

//This is for the sake of simplicity in example
SingletonHolder.LockedGibiByteArrayPool = new LockingArrayPool<byte[]>(new [] { new byte[1_000_000_000], new byte[1_000_000_000] });
Task producer1 = Task.Run(async () =>
{
    while (true)
    {
        await channel.Writer.WaitToWriteAsync();

        // Grab an array from our pool.
        byte[] array = await 
SingletonHolder.LockedGibiByteArrayPool.GetItemAsync();

        // Here initialize the array (mainly I/O bound, time consuming)

        // The channel should not be full,
        // But the reader -must- make sure to release the array
        // when it is done.
        // alternatively, use the 'LockPooledItem` pattern suggested,
        // and then at least it's just a `Dispose()` call...
        await channel.Writer.WriteAsync(array);
    }
});

Upvotes: 3

Related Questions