23W
23W

Reputation: 1530

How to handle broken connection during active HTTP client operation?

This is C# code example that downloads big file (1Gb) and saves it as "download.bin" file. Operation timeout is setup to 20 seconds.

async Task RunHttpDownload()
{
    var url = "https://speed.hetzner.de/1GB.bin";

    Console.WriteLine($"Start download {DateTime.Now}");

    try
    {
        using (var httpClient = new HttpClient() { Timeout = TimeSpan.FromSeconds(20) })
        using (var httpResp = await httpClient.GetAsync(url, HttpCompletionOption.ResponseHeadersRead))
        {
            httpResp.EnsureSuccessStatusCode();

            using (var fileStream = new FileStream("download.bin", FileMode.OpenOrCreate, FileAccess.Write, FileShare.Read, bufferSize: 4096, useAsync: true))
            using (var httpStream = await httpResp.Content.ReadAsStreamAsync())
            {
                await httpStream.CopyToAsync(fileStream);
            }
        }
    }
    catch (Exception e)
    {
        Console.WriteLine($"Exception: {e.Message}");
    }

    Console.WriteLine($"End download {DateTime.Now}");
}

HTTP Get method completion is HttpCompletionOption.ResponseHeadersRead to avoid loading entire 1Gb input file in memory. If user lost Internet connection (WiFi is switched off) at line await httpStream.CopyToAsync(fileStream); (method spends 99% of time here), operation will never finished! even if timeout is 20 seconds! For some reason C# HTTP client can detect case when connection is lost (broken) and throw some exception about it.

How to fix it?

Upvotes: 1

Views: 468

Answers (2)

23W
23W

Reputation: 1530

Source code for fix proposed by @stephen-cleary. See the answer.

This is Stream.CopyToAsync extension method that reads data from source stream in chunks and limits one chunk filling operation by timeout.

My investigation showed that http stream doesn't check cancellation token during read body response as often as it required, so there is big chance that operation hangs due to WiFi disconnection. So classical method to add timeout as new cancellation token linked with source token doesn't work.

To avoid this problem I implemented other method - background task that waits required time. In my code it's a default method.

    public static class StreamExtension
    {
        public enum TimeOutMethod
        {
            /// <summary>
            /// Timeout is calculated independently by background waiting task.
            /// </summary>
            IndependentWaiting,

            /// <summary>
            /// Timeout is calculated by the new cancellation token linked with source operation token.
            /// This methods works well only if async operation periodically checks cancellation token.
            /// </summary>
            LinkedCancelattion,

            /// <summary>
            /// Timeout is calculated by Stream itself.
            /// </summary>
            StreamTimeout,
        }

        /// <summary>
        /// CopyToAsync extension method.
        /// Method reads data from source stream in chunks. A chunk is a memory buffer of the required size.
        /// Method limits buffer filling operation by timeout.
        /// If the timeout expires before the buffer is full is complete, the method throws a TimeoutException.
        /// </summary>
        /// <param name="source">Source stream</param>
        /// <param name="destination">Destination stream</param>
        /// <param name="readBufferSize">Size in bytes of buffer for reading opeartion</param>
        /// <param name="readBufferTimeOut">Timeout of reading the buffer</param>
        /// <param name="timeOutMethod">Timeout calculation method</param>
        /// <param name="cancellationToken">Operation cancellation method</param>
        /// <returns></returns>
        /// <exception cref="ArgumentException"></exception>
        /// <exception cref="TimeoutException"></exception>
        public static Task CopyToAsync(this Stream source, Stream destination, int readBufferSize, TimeSpan readBufferTimeOut, TimeOutMethod timeOutMethod = TimeOutMethod.IndependentWaiting, CancellationToken cancellationToken = default(CancellationToken))
        {
            var res = default(Task);
            var token = (cancellationToken != default(CancellationToken)) ? cancellationToken : CancellationToken.None;

            switch (timeOutMethod)
            {
                case TimeOutMethod.IndependentWaiting:
                    res = CopyToIndependentWaitingAsync(source, destination, readBufferSize, readBufferTimeOut, token);
                    break;

                case TimeOutMethod.LinkedCancelattion:
                    res = CopyToLinkedCancellationAsync(source, destination, readBufferSize, readBufferTimeOut, token);
                    break;

                case TimeOutMethod.StreamTimeout:
                    res = CopyToStreamTimeoutAsync(source, destination, readBufferSize, readBufferTimeOut, token);
                    break;

                default:
                    throw new ArgumentException(nameof(timeOutMethod));
            }

            return res;
        }

        #region Helper Methods

        static async Task CopyToIndependentWaitingAsync(Stream source, Stream destination, int readBufferSize, TimeSpan readBufferTimeOut, CancellationToken cancellationToken)
        {
            var chunk = new byte[readBufferSize];

            var read = 0;
            do
            {
                var readingTask = source.ReadAsync(chunk, 0, chunk.Length, cancellationToken);
                var waitingTask = Task.Delay(readBufferTimeOut, cancellationToken);
                var tasks = new Task[] { readingTask, waitingTask };

                var taskIndex = Task.WaitAny(tasks, cancellationToken);
                if (tasks[taskIndex].IsCanceled)
                {
                    // normal task cancellation
                    throw new TaskCanceledException(tasks[taskIndex]);
                }
                else if (tasks[taskIndex].Exception != default(Exception))
                {
                    // it re-throws a task exception with task stack information
                    await tasks[taskIndex];
                }

                if (taskIndex != 0)
                {
                    throw new TimeoutException();
                }

                read = readingTask.Result;
                if (read > 0)
                {
                    await destination.WriteAsync(chunk, 0, read, cancellationToken).ConfigureAwait(false);
                }
            }
            while (read != 0);
        }

        static async Task CopyToLinkedCancellationAsync(Stream source, Stream destination, int readBufferSize, TimeSpan readBufferTimeOut, CancellationToken cancellationToken)
        {
            var chunk = new byte[readBufferSize];

            var read = 0;
            do
            {
                try
                {
                    using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
                    {
                        cts.CancelAfter(readBufferTimeOut);

                        read = await source.ReadAsync(chunk, 0, chunk.Length, cts.Token).ConfigureAwait(false);
                        if (read > 0)
                        {
                            await destination.WriteAsync(chunk, 0, read, cancellationToken).ConfigureAwait(false);
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    if (cancellationToken.IsCancellationRequested)
                    {
                        throw;
                    }
                    else
                    {
                        throw new TimeoutException();
                    }
                }
            }
            while (read != 0);
        }

        static async Task CopyToStreamTimeoutAsync(Stream source, Stream destination, int readBufferSize, TimeSpan readBufferTimeOut, CancellationToken cancellationToken)
        {
            var oldReadTimeout = source.ReadTimeout;
            source.ReadTimeout = (int)readBufferTimeOut.TotalMilliseconds;

            try
            {
                await source.CopyToAsync(destination, readBufferSize, cancellationToken).ConfigureAwait(false);
            }
            finally
            {
                source.ReadTimeout = oldReadTimeout;
            }
        }

        #endregion
    }

Now, file downloading works as required, with timeout. Example:

        static async Task RunTotalCommander()
        {
            var url = "https://speed.hetzner.de/1GB.bin";
            var bufferSize = 40 * 1024;
            var bufferTimeOut = TimeSpan.FromSeconds(20);

            Console.WriteLine($"Start download {DateTime.Now}");

            try
            {
                using (var httpClient = new HttpClient() { Timeout = bufferTimeOut })
                using (var httpResp = await httpClient.GetAsync(url, HttpCompletionOption.ResponseHeadersRead))
                {
                    httpResp.EnsureSuccessStatusCode();

                    using (var fileStream = new FileStream("download.bin", FileMode.OpenOrCreate, FileAccess.Write, FileShare.Read, bufferSize: 4096, useAsync: true))
                    using (var httpStream = await httpResp.Content.ReadAsStreamAsync())
                    {
                        await httpStream.CopyToAsync(fileStream, bufferSize, bufferTimeOut, StreamExtension.TimeOutMethod.IndependentWaiting);
                    }
                }
            }
            catch (Exception e)
            {
                Console.WriteLine($"Exception: {e.Message}");
            }

            Console.WriteLine($"End download {DateTime.Now}");
        }

Upvotes: 0

Stephen Cleary
Stephen Cleary

Reputation: 456537

For some reason C# HTTP client can[not] detect case when connection is lost (broken) and throw some exception about it.

The reason is actually the HTTP protocol. It was designed a long time ago before people knew much about protocol design, and it forces a one-way communication from client to server followed by one-way communication from server to client. The problem with one-way communication over TCP/IP is that the receiving end has no notification for when the connection is dropped. In the future, this will be less of an issue, since I believe HTTP/QUIC has proper heartbeat support.

For now, HTTP clients and servers work around this problem by using timers, which is the only reliable approach for a protocol you can't change, as I explain on my blog. Servers give clients a certain amount of time to finish sending their request. Client support for timeouts is a bit different; most clients use one timeout value for the whole round trip time. For common REST requests (which have relatively small request and response bodies), this works well enough that most developers aren't even aware there's a timeout to work around a limitation in the HTTP protocol.

So, let's talk solutions. The reason your existing timeout isn't working is because HttpClient.Timeout only applies to the time spent in SendAsync. Due to the ResponseHeadersRead, SendAsync does not cover the entire response - just until the headers are read. So the HttpClient.Timeout is just until the response body starts as opposed to when it ends.

There's nothing in HttpClient that provides timeouts on reading the response body if you use ResponseHeadersRead. So you'll have to use your own timeouts. Here's one that gives the body 20 seconds to complete:

using (var fileStream = new FileStream("download.bin", FileMode.OpenOrCreate, FileAccess.Write, FileShare.Read, bufferSize: 4096, useAsync: true))
using (var httpStream = await httpResp.Content.ReadAsStreamAsync())
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20)))
{
  await httpStream.CopyToAsync(fileStream, cts.Token);
}

Upvotes: 2

Related Questions