0xSingularity
0xSingularity

Reputation: 570

C# Speed Up Writing from Response Stream to ViewStream

I have this code that is asynchronously splitting a file into parts, and downloading them using HTTP content range. It then writes the downloaded data to a ViewStream on a Memory Mapped file. I am currently reading from the response stream into a buffer, then writing all the data from the buffer into the ViewStream. Is there a more efficient/faster way to do this? I am not really concerned about memory use, but I am trying to maximize speed. Pieces is a list that contains value tuples indicating the (Start, End) for the piece of the file, and httpPool is a object pool with a bunch of preconfigured HTTP Clients. Any help is greatly appreciated, thank you!

await Parallel.ForEachAsync(pieces,
                        new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount },
                        async (piece, cancellationToken) =>
                        {
                            //Get a http client from the pool and request for the content range
                            var client = httpPool.Get();
                            var request = new HttpRequestMessage { RequestUri = new Uri(url) };
                            request.Headers.Range = new RangeHeaderValue(piece.Item1, piece.Item2);
                            
                            //Request headers so we dont cache the file into memory
                            if (client != null)
                            {
                                var message = await client.SendAsync(request,HttpCompletionOption.ResponseHeadersRead,cancellationToken).ConfigureAwait(false);

                                if (message.IsSuccessStatusCode)
                                {
                                    //Get the content stream from the message request
                                    using (var streamToRead = await message.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false))
                                    {
                                        //Create a memory mapped stream to the mmf with the piece offset and size equal to the response size
                                        using (var streams = mmf.CreateViewStream(piece.Item1,message.Content.Headers.ContentLength!.Value,MemoryMappedFileAccess.Write))
                                        {
                                            //Copy from the content stream to the mmf stream
                                            var buffer = new byte[bufferSize];
                                            int offset, bytesRead;
                                            // Until we've read everything
                                            do
                                            {
                                                offset = 0;
                                                // Until the buffer is very nearly full or there's nothing left to read
                                                do
                                                {
                                                    bytesRead = await streamToRead.ReadAsync(buffer.AsMemory(offset, bufferSize - offset),cancellationToken);
                                                    offset += bytesRead;
                                                } while (bytesRead != 0 && offset < bufferSize);

                                                // Empty the buffer
                                                if (offset != 0)
                                                {
                                                    await streams.WriteAsync(buffer.AsMemory(0, offset),cancellationToken);
                                                }
                                            } while (bytesRead != 0);

                                            streams.Flush();
                                            streams.Close();
                                        }

                                        streamToRead.Close();
                                    }
                                }

                                message.Content.Dispose();
                                message.Dispose();
                            }

                            request.Dispose();
                            httpPool.Return(client);
                        });

Upvotes: 0

Views: 349

Answers (1)

Petrusion
Petrusion

Reputation: 1153

I don't know how much it is going to help, but I tried to do something. How well does it work?

I also did some refactoring, so here are some notes:

  • Do not call .Close() or .Dispose() manually if you already have a using block or a using statement. All it does is add noise to your code and confuse anyone reading it. In fact, almost never call .Close() or .Dispose() manually at all.
  • Do you realize client would never be returned to the pool if any exception occurred in the method? You need to do these things in a finally block or by using an IDisposable struct which returns client to the pool in it's Dispose() implementation. (also, request would not be disposed in the method if any exception occurred, add using)
  • Whenever you can, prefer if statements that return early rather than ones that wrap the entire rest of the method. The latter is hard to read and maintain.
  • You are not really benefiting from Parallel as 99% of the method is asynchronously waiting for IO. Just use Task.WhenAll() instead.
  • I got rid of the custom buffering/copying and just called the CopyToAsync() method on message.Content which accepts a Stream. It should help the performance, probably. I reckon it has to be better optimized than the simplest possible buffer thingee.

Code:

await Task.WhenAll(pieces.Select(p => DownloadToMemoryMappedFile(p)));

// change the piece type from dynamic to what you need
async Task DownloadToMemoryMappedFile(dynamic piece, CancellationToken cancellationToken = default)
{
    //Get a http client from the pool and request for the content range
    var client = httpPool.Get();

    try
    {
        using var request = new HttpRequestMessage { RequestUri = new Uri(url) };

        //Request headers so we dont cache the file into memory
        request.Headers.Range = new RangeHeaderValue(piece.Item1, piece.Item2);

        if (client is null)
        return;

        using var message = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);

        if (!message.IsSuccessStatusCode)
            return;

        //Create a memory mapped stream to the mmf with the piece offset and size equal to the response size
        using var streams = mmf.CreateViewStream(piece.Item1, message.Content.Headers.ContentLength!.Value, MemoryMappedFileAccess.Write);

        await message.Content.CopyToAsync(streams).ConfigureAwait(false);
    }
    finally
    {
        httpPool.Return(client);
    }
}

Upvotes: 1

Related Questions