Reputation: 43545
Here is my scenario¹. I have a producer consumer system that is composed by two producers, one consumer, and a bounded Channel<T>
configured with capacity 2. The T
is byte[]
.
Channel<byte[]> channel = Channel.CreateBounded<byte[]>(2);
The byte[]
s propagated through the channel are huge (1GB each), which creates the need to limit the total number of the arrays that exist at any given moment to a minimum. So the two producers are waiting before creating a new byte[1_000_000_000]
, until they know that there is empty space in the channel. Here is the first producer:
Task producer1 = Task.Run(async () =>
{
while (true)
{
await channel.Writer.WaitToWriteAsync();
// The channel has space available. Let's create the array.
byte[] array = new byte[1_000_000_000];
// Here initialize the array (mainly I/O bound, time consuming)
// At this moment the channel might be full,
// because the other producer filled the gap.
await channel.Writer.WriteAsync(array);
}
});
The second producer is identical. Unfortunately this allows both producers to start creating a new array, even when there is only one empty slot in the channel. So at some moment the system might have 4 huge arrays alive at the same time: 1 consumed by the consumer, 1 stored in the channel, and 2 created concurrently be the two producers (trying to fill a single empty slot).
I want to limit the total number of arrays in managed memory to 3. Is there any way that I can tame my producers, so that they don't start creating a new byte[]
until there is certainly space available for it in the channel? In other words after creating an array, the producer should be able to write it immediately in the channel like this:
bool success = channel.Writer.TryWrite(array);
...and the success
should always be true
.
¹ This scenario is contrived. It was inspired by a recent GitHub issue.
Clarification: The construction and initialization of the byte arrays is the exclusive responsibility of the producer, and it should stay this way. Delegating the construction work elsewhere, either partially or fully, is not desirable.
Upvotes: 1
Views: 1294
Reputation: 456587
The construction and initialization of the byte arrays is the exclusive responsibility of the producer, and it should stay this way. Delegating the construction work elsewhere, either partially or fully, is not desirable.
In that case, you can use the limiter to limit "tokens", if you will, where each token is an authorization to allocate.
Note the Disposable
type is from my package.
public sealed class TokenAllocator
{
private readonly SemaphoreSlim _mutex;
public TokenAllocator(int maxTokens) =>
_mutex = new(maxTokens);
public async Task<IDisposable> AllocateAsync()
{
await _mutex.WaitAsync();
return Disposable.Create(() => _mutex.Release());
}
}
Usage:
var allocator = new TokenAllocator(3);
var channel = Channel.CreateBounded<(IDisposable token, byte[] item)>(2);
var consumer = Task.Run(async () =>
{
await foreach (var (token, item) in channel.Reader.ReadAllAsync())
using (token)
{
... // Do something with `item`
}
});
var producer1 = Task.Run(async () =>
{
while (true)
{
var token = await allocator.AllocateAsync();
try
{
var item = new byte[1_000_000_000];
... // Do something with `item`
}
catch
{
token.Dispose();
throw;
}
await channel.Writer.WriteAsync((token, item));
}
});
Upvotes: 2
Reputation: 43545
Here is another approach, which also uses a SemaphoreSlim
like in Stephen Cleary's and to11mtm's answers. The difference is that the initialCount
and maxCount
of the semaphore are initialized to the exact capacity of the channel, and the semaphore is released by the consumer immediately after a byte[]
is taken from the channel, not when it's fully processed:
const int capacity = 2;
Channel<byte[]> channel = Channel.CreateBounded<byte[]>(capacity);
SemaphoreSlim semaphore = new(capacity, capacity);
Task producer1 = Task.Run(async () =>
{
while (true)
{
await semaphore.WaitAsync();
try
{
byte[] array = new byte[1_000_000_000];
// Here initialize the array...
await channel.Writer.WriteAsync(array);
}
catch
{
semaphore.Release();
throw;
}
}
});
Task consumer = Task.Run(async () =>
{
await foreach (byte[] array in channel.Reader.ReadAllAsync())
{
semaphore.Release();
// Here consume the array...
}
});
Essentially the SemaphoreSlim
becomes the guard that limits the capacity of the channel. You might as well use an unbounded channel if you want.
This approach ensures that the maximum number of byte[]
s allocated at any time will be 3, excluding those that are eligible for garbage collection.
Upvotes: 0
Reputation: 456587
Buffered producer/consumer systems like channels and Dataflow are a little "fuzzy" around maximum buffer sizes (I can never remember if Dataflow counts items in output buffers or not). And as you point out, they don't count any items held by producers or consumers.
So, in order to restrict the total number of objects at any time, you'll need your own allocator.
public sealed class LimitedAllocator<T>
{
private readonly SemaphoreSlim _mutex;
public LimitedAllocator(int maxItems) =>
_mutex = new(maxItems);
public async Task<AllocatedItem> AllocateAsync(Func<T> create)
{
await _mutex.WaitAsync();
return new(this, create());
}
private void Free() => _mutex.Release();
public sealed class AllocatedItem : IDisposable
{
public AllocatedItem(LimitedAllocator<T> allocator, T item)
{
Item = item;
_disposer = Disposable.Create(() => allocator.Free());
}
public T Item { get; }
public void Dispose() => _disposer.Dispose();
private readonly IDisposable _disposer;
}
}
Usage:
var allocator = new LimitedAllocator<byte[]>(3);
var channel = Channel.CreateBounded<LimitedAllocator<byte[]>.AllocatedItem>(2);
var consumer = Task.Run(async () =>
{
await foreach (var allocatedItem in channel.Reader.ReadAllAsync())
using (allocatedItem)
{
... // Do something with allocatedItem.Item
}
});
var producer1 = Task.Run(async () =>
{
while (true)
{
var allocatedItem = await allocator.AllocateAsync(() => new byte[1_000_000_000]);
... // Do something with allocatedItem.Item
await channel.Writer.WriteAsync(allocatedItem);
}
});
Notes:
...
in the producer needs a try
/catch
around it that disposes the allocated item only in the case of exceptions.T
in LimitedAllocator<T>
is just memory (e.g., byte[]
), you can consider using an IMemoryOwner
instead of AllocatedItem
. IMemoryOwner
is essentially a disposable combined with memory.AllocatedItem
pairing of IDisposable
-with-item, then it's possible to use connected properties. But that tends towards more magic and less maintainable code.Upvotes: 2
Reputation: 168
One option, although it requires a little manual management, would be a simple wrapper around a channel that limits the size.
public class LockingFixedPool<T>
{
private readonly Channel<T> Items;
private readonly SemaphoreSlim locker;
public LockingFixedPool(T[] seeded)
{
Items = Channel.CreateBounded<T>(seeded.Length);
foreach (var element in seeded)
{
Items.Writer.TryWrite(element);
}
}
public async Task<T> GetItemAsync()
{
return await Items.Reader.ReadAsync();
}
public void Release(T item)
{
//unless you make this specific to arrays,
//you should clear before calling release.
Items.Writer.TryWrite(item);
}
}
The biggest problem with this 'bare' implementation, is that you have to manually manage when the item is released. As an alternative, you could return a 'wrapped' instance:
public class LockPooledItem<T> : IDisposable
{
public T Item {get;}
private LockingFixedPool<T> _pool;
public LockPooledItem(T item, LockingFixedPool<T> pool)
{
Item = item;
_pool = pool;
}
public void Dispose()
{
_pool.Release(Item);
}
}
And have LockingFixedPool return instances of those. OFC you'll then be allocating the tracking object, -but- it is a trade off between composition and manual code around managing the allocation.
Edit per comment:
//This is for the sake of simplicity in example
SingletonHolder.LockedGibiByteArrayPool = new LockingArrayPool<byte[]>(new [] { new byte[1_000_000_000], new byte[1_000_000_000] });
Task producer1 = Task.Run(async () =>
{
while (true)
{
await channel.Writer.WaitToWriteAsync();
// Grab an array from our pool.
byte[] array = await
SingletonHolder.LockedGibiByteArrayPool.GetItemAsync();
// Here initialize the array (mainly I/O bound, time consuming)
// The channel should not be full,
// But the reader -must- make sure to release the array
// when it is done.
// alternatively, use the 'LockPooledItem` pattern suggested,
// and then at least it's just a `Dispose()` call...
await channel.Writer.WriteAsync(array);
}
});
Upvotes: 3