Reputation: 1180
I recently benchmarked my framework and noticed that it allocates tons of garbage.
I'm using a Channel<T>
and the TryRead
or ReadAsync
operation allocates memory every single call. So I exchanged that with a BlockingCollection<T>
which also allocates memory during TryTake
.
I used a unbounded channel with a single writer/reader. And a normal BlockingCollection<T>
.
// Each thread runs this, jobmeta is justa struct
while (!token.IsCancellationRequested)
{
var jobMeta = await Reader.ReadAsync(token); // <- allocs here
jobMeta.Job.Execute();
jobMeta.JobHandle.Notify();
}
The profiler told me that all allocations are caused by the ChannelReader.ReadAsync
method. Unfortunately I can't show the full code, however since I use them in a hot path, I need to avoid allocations at all cost.
Are there any alternatives which do not allocate memory during read/write/get and behave the same (Concurrent classes for producer/consumer multithreading) ? How could I implement one by myself?
Upvotes: 3
Views: 2482
Reputation: 131423
You don't need an alternative type but an alternative way of using Channels. Instead of cancelling the reader, close the writer. This is the technique used in the .NET RabbitMQ client.
Besides, it's far more important to ensure that the payload doesn't require lots of allocations. If you use ArrayPool to reduce allocations you can't just abort the worker method, you need to retrieve the messages and release them.
Channels in RabbitMQ
RabbitMQ uses Channels in the hottest path - when writing message frames to the network stream and when receiving bytes. Few libraries handle more traffic than RabbitMQ. Kestrel itself is one of them, and the use case is similar. Both are expected to handle millions of requests/messages without overhead.
Writing is handled by the WriteLoop
method which runs in a background task:
_writerTask = Task.Run(WriteLoop);
The WriteLoop method itself doesn't cancel. If it did, it would risk leaving a message unfinished.
private async Task WriteLoop()
{
try
{
while (await _channelReader.WaitToReadAsync().ConfigureAwait(false))
{
while (_channelReader.TryRead(out ReadOnlyMemory<byte> memory))
{
MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment);
#if NETSTANDARD
await _writer.WriteAsync(segment.Array, segment.Offset, segment.Count).ConfigureAwait(false);
#else
await _writer.WriteAsync(memory).ConfigureAwait(false);
#endif
RabbitMqClientEventSource.Log.CommandSent(segment.Count);
ArrayPool<byte>.Shared.Return(segment.Array);
}
await _writer.FlushAsync().ConfigureAwait(false);
}
}
catch (Exception ex)
{
ESLog.Error("Background socket write loop has crashed", ex);
throw;
}
}
In this code _writer
is a buffered network stream:
_writer = new BufferedStream(netstream, _socket.Client.SendBufferSize);
The Write method is only a call to _channelWriter.TryWrite
so it doesn't have to check whether the writer is closed or not
public void Write(ReadOnlyMemory<byte> memory)
{
_channelWriter.TryWrite(memory);
}
Cancelling
means calling Close which completes the writer and once the pipeline completes, closes the socket :
try
{
_channelWriter.Complete();
_writerTask?.GetAwaiter().GetResult();
}
catch
{
// ignore, we are closing anyway
}
try
{
_socket.Close();
}
catch
{
// ignore, we are closing anyway
}
finally
{
_closed = true;
}
This ensures that any pending messages are still processed.
It's far more important to ensure that the payload doesn't require lots of allocations. RabbitMQ uses a Channel.CreateUnbounded<ReadOnlyMemory<byte>>
. The buffer itself comes from ArrayPool<byte>.Shared
. Once a frame is sent, the buffer is released back to the pool
ArrayPool<byte>.Shared.Return(segment.Array);
Nuking the Pipeline without leaking buffers
In normal operation you wouldn't need to cancel anyway. You'd only need that if you had to nuke the pipeline instead of stopping the publisher. One such case could be that a downstream worker can't process messages any more, because a network connection was terminated. In this case, calling Complete
on the publisher could still leave messages in the Channel that can no longer be processed.
The cooperative way to handle this is to discard the messages if cancellation is signaled, not cancel the Read
operation itself. If we use ArrayPool we must receive those buffers to release them anyway.
The RabbitMQ code doesn't work this way, but handling this needs only a simple change in the inner loop:
while (await _channelReader.WaitToReadAsync().ConfigureAwait(false))
{
while (_channelReader.TryRead(out ReadOnlyMemory<byte> memory))
{
MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment);
if(!token.IsCancellationRequested)
{
// Normal processing
}
ArrayPool<byte>.Shared.Return(segment.Array);
}
await _writer.FlushAsync().ConfigureAwait(false);
}
This ensures the worker method stops processing messages immediately without leaking buffers.
Structs and multi-threaded workers in RabbitMQ
RabbitMQ uses the same pattern in the ConsumerDispatcherChannelBase class. In this case the payload is a readonly struct WorkStruct which also holds a pooled buffer, RentedArray.
The difference this time is that there can be multiple consumers for the same RabbitMQ connection, so the class can start multiple worker tasks :
Func<Task> loopStart = ProcessChannelAsync;
if (concurrency == 1)
{
_worker = Task.Run(loopStart);
}
else
{
var tasks = new Task[concurrency];
for (int i = 0; i < concurrency; i++)
{
tasks[i] = Task.Run(loopStart);
}
_worker = Task.WhenAll(tasks);
}
Shutting down works the same though. Complete the writer and wait for the workers to exit :
protected override Task InternalShutdownAsync()
{
_writer.Complete();
return _worker;
}
Upvotes: 1
Reputation: 43545
The System.Threading.Channels library currently has three built-in Channel<T>
implementations:
From those implementations, the less allocatey is the BoundedChannel<T>
. If you don't want bounds, you can configure it with capacity: Int32.MaxValue
. The UnboundedChannel<T>
is based internally on a ConcurrentQueue<T>
, which is a very performant and non-contentious collection (it's lock-free). The allocations are a necessary compromise for being lock-free. The BoundedChannel<T>
is based on an internal Deque<T>
collection, which is synchronized with lock
s. It allocates memory only when it has to expand the capacity of it's backing array, which will happen only a few times during the lifetime of the channel.
The BlockingCollection<T>
is also based on a ConcurrentQueue<T>
by default, so it has the same advantages and disadvantages. If you want to reduce the allocations (reducing also the performance and increasing the contention), you could implement an IProducerConsumerCollection<T>
based on a synchronized Queue<T>
, and pass it as an argument to the BlockingCollection<T>
constructor. You could use this answer as a starting point.
Finally passing CancellationToken
s to any of these APIs will result to allocations no matter what. The CancellationToken
must register a callback in order to have instantaneous effect, and callbacks without allocations are not possible. My suggestion is to get rid of the CancellationToken
, and find some other way of completing gracefully. Like using the ChannelWriter<T>.Complete
or the BlockingCollection<T>.CompleteAdding
methods.
Upvotes: 9