Reputation: 1343
I'm coding my own HttpClient that should Handle HTTP - 429
(TooManyRequests) responses. I'm executing a single method in the client in parallel. As soon as I get a 429
StatusCode as a response, I would like to pause the execution of all Tasks, that are currently calling the method.
Currently, I'm using very old code from an old MS DevBlog: PauseToken/Source
private readonly HttpClient _client;
private readonly PauseTokenSource PauseSource;
private readonly PauseToken PauseToken;
public MyHttpClient(HttpClient client)
{
_client = client;
PauseSource = new();
PauseToken = PauseSource.Token;
}
public async Task<HttpResponseMessage> PostAsJsonAsync<TValue>(string? requestUri?, TValue value, CancellationToken cancellationToken = default)
{
try
{
await PauseToken.WaitWhilePausedAsync(); // I'd really like to pass the cancellationToken as well
HttpResponseMessage result = await _client.PostAsJsonAsync(requestUri, value, cancellationToken).ConfigureAwait(false);
if (result.StatusCodes == HttpStatusCode.TooManyRequests)
{
PauseSource.IsPaused = true;
TimeSpan delay = (result.Headers.RetryAfter?.Date - DateTimeOffset.UtcNow) ?? TimeSpan.Zero;
await Task.Delay(delay, cancellationToken);
PauseSource.IsPaused = false;
return await PostAsJsonAsync(requestUri, value, cancellationToken);
}
return result;
}
finally
{
PauseSource.IsPaused = false;
}
}
MyHttpClient.PostAsJsonAsync
is called like this:
private readonly MyHttpClient _client; // This gets injected by the constructor DI
private string ApiUrl; // This as well
public async Task SendToAPIAsync<T>(IEnumerable<T> items, CancellationToken cancellationToken = default)
{
IEnumerable<Task<T>> tasks = items.Select(item =>
_client.PostAsJsonAsync(ApiUrl, item, cancellationToken));
await Task.WhenAll(tasks).ConfigureAwait(false);
}
The items
collection will contain 15'000 - 25'000 items. The API is unfortunately built so I have to make 1 request for each item.
I really dislike using old code like this, since I honestly don't even know what it does under the hood (the entire source code can be looked at in the linked article above). Also, I'd like to pass my cancellationToken
to the WaitWhilePausedAsync()
method since execution should be able to be cancelled at any time.
Is there really no easy way to "pause an async
method"?
I've tried to store the DateTimeOffset
I get from the result
->RetryAfter
in a local field, then just simply Task.Delay()
the delta to DateTimeOffset.UtcNow
, but that didn't seem to work and I also don't think it's very performant.
I like the idea of having a PauseToken
but I think there might be better ways to do this nowadays.
Upvotes: 0
Views: 194
Reputation: 1343
I ultimately created a library that contains a HttpClientHandler
which handles these results for me. For anyone interested, here's the repo: github.com/baltermia/too-many-requests-handler (the NuGet package is linked in the readme).
A comment above led me to the solution below. I used the github.com/StephenCleary/AsyncEx library, that both has PauseTokenSource
and the AsyncLock
types which provided the functionality I was searching for.
private readonly AsyncLock _asyncLock = new();
private readonly HttpClient _client;
private readonly PauseTokenSource _pauseSource = new();
public PauseToken PauseToken { get; }
public MyHttpClient(HttpClient client)
{
_client = client;
PauseToken = _pauseSource.Token;
}
public async Task<HttpResponseMessage> PostAsJsonAsync<TValue>(string? requestUri?, TValue value, CancellationToken cancellationToken = default)
{
{
// check if requests are paused and wait
await PauseToken.WaitWhilePausedAsync(cancellationToken).ConfigureAwait(false);
HttpResponseMessage result = await _client.PostAsJsonAsync(requestUri, value, cancellationToken).ConfigureAwait(false);
// if result is anything but 429, return (even if it may is an error)
if (result.StatusCode != HttpStatusCode.TooManyRequests)
return result;
// create a locker which will unlock at the end of the stack
using IDisposable locker = await _asyncLock.LockAsync(cancellationToken).ConfigureAwait(false);
// calculate delay
DateTimeOffset? time = result.Headers.RetryAfter?.Date;
TimeSpan delay = time - DateTimeOffset.UtcNow ?? TimeSpan.Zero;
// if delay is 0 or below, return new requests
if (delay <= TimeSpan.Zero)
{
// very important to unlock
locker.Dispose();
// recursively recall itself
return await PostAsJsonAsync(requestUri, value, cancellationToken).ConfigureAwait(false);
}
try
{
// otherwise pause requests
_pauseSource.IsPaused = true;
// then wait the calculated delay
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
}
finally
{
_pauseSource.IsPaused = false;
}
// make sure to unlock again (otherwise the method would lock itself because of recursion)
locker.Dispose();
// recursively recall itself
return await PostAsJsonAsync(requestUri, value, cancellationToken).ConfigureAwait(false);
}
}
Upvotes: 1
Reputation: 36361
I really dislike using old code like this
Just because code is old does not necessarily mean it is bad.
Also, I'd like to pass my cancellationToken to the WaitWhilePausedAsync() method since execution should be able to be cancelled at any time
As far as I can tell, the WaitWhilePausedAsync
just returns a task, If you want to abort as soon as the cancellation token is cancelled you could use this answer for an WaitOrCancel
extension, used like:
try{
await PauseToken.WaitWhilePausedAsync().WaitOrCancel(cancellationToken );
}
catch(OperationCancelledException()){
// handle cancel
}
Is there really no easy way to "pause an async method"?
To 'pause and async method' should mean we need to await something, since we probably want to avoid blocking. That something need to be a Task
, so such a method would probably involve creating a TaskCompletionSource that can be awaited, that completes when unpaused. That seem to be more or less what your PauseToken does.
Note that any type of 'pausing' or 'cancellation' need to be done cooperatively, so any pause feature need to be built, and probably need to be built by you if you are implementing your own client.
But there are might be alternative solutions. Maybe use a SemaphoreSlim for rate-limiting? Maybe just delay the request a bit if you get a ToManyRequests error? Maybe use a central queue of requests that can be throttled?
Upvotes: 1