Reputation: 100248
I have an Azure WebKob with blob and queue triggers to save data to Azure DocumentDb.
From time to time I'm getting an error:
Microsoft.Azure.Documents.RequestRateTooLargeException: Message: {"Errors":["Request rate is large"]}
Currently I throttle requests using this code. A WebJob function:
public async Task ParseCategoriesFromCsv(...)
{
double find = 2.23, add = 5.9, replace = 10.67;
double requestCharge = Math.Round(find + Math.Max(add, replace));
await categoryProvider.SaveCategories(requestCharge , categories);
}
Category provider to manipulate document db client:
public async Task<ResourceResponse<Document>[]> SaveCategories(double requestCharge, Category[] categories)
{
var requestDelay = TimeSpan.FromSeconds(60.0 / (collectionOptions.RequestUnits / requestCharge));
var scheduler = new IntervalTaskScheduler(requestDelay, Scheduler.Default); // Rx
var client = new DocumentClient(endpoint, authorizationKey,
new ConnectionPolicy
{
ConnectionMode = documentDbOptions.ConnectionMode,
ConnectionProtocol = documentDbOptions.ConnectionProtocol
});
return await Task.WhenAll(documents.Select(async d =>
await scheduler.ScheduleTask(
() => client.PutDocumentToDb(collectionOptions.CollectionLink, d.SearchIndex, d))));
}
Task scheduler to throttle/measure/synchronize requests:
private readonly Subject<Action> _requests = new Subject<Action>();
private readonly IDisposable _observable;
public IntervalTaskScheduler(TimeSpan requestDelay, IScheduler scheduler)
{
_observable = _requests.Select(i => Observable.Empty<Action>()
.Delay(requestDelay)
.StartWith(i))
.Concat()
.ObserveOn(scheduler)
.Subscribe(action => action());
}
public Task<T> ScheduleTask<T>(Func<Task<T>> request)
{
var tcs = new TaskCompletionSource<T>();
_requests.OnNext(async () =>
{
try
{
T result = await request();
tcs.SetResult(result);
}
catch (Exception ex)
{
tcs.SetException(ex);
}
});
return tcs.Task;
}
So it's basically a number of constants from ResourceResponse<Document>.RequestCharge
but:
What a throttling/measuring/synchronization mechanism could work here well?
Upvotes: 1
Views: 1369
Reputation: 1118
Starting .NET SDK 1.8.0, we automatically handle the Request Rate too large exceptions to a reasonable extent(will retry 9 times by default and honor the retry after returned from server for next retry).
In case you need greater control, you can configure the RetryOptions on the ConnectionPolicy instance that you pass in to the DocumentClient objectand we would override the default retry policy with it.
So you no longer need to add any custom logic for handling 429 exceptions in your application code like above.
Upvotes: 2
Reputation: 117027
It seems to me that you should be able to do this with your SaveCategories
method to make it work nicely with Rx:
public IObservable<ResourceResponse<Document>[]> SaveCategories(double requestCharge, Category[] categories)
{
var requestDelay = TimeSpan.FromSeconds(60.0 / (collectionOptions.RequestUnits / requestCharge));
var client = new DocumentClient(endpoint, authorizationKey,
new ConnectionPolicy
{
ConnectionMode = documentDbOptions.ConnectionMode,
ConnectionProtocol = documentDbOptions.ConnectionProtocol
});
return
Observable.Interval(requestDelay)
.Zip(documents, (delay, doc) => doc)
.SelectMany(doc => Observable.FromAsync(() => client.PutDocumentToDb(collectionOptions.CollectionLink, doc.SearchIndex, doc)))
.ToArray();
}
This totally gets rid of your IntervalTaskScheduler
class and ensures that you limit the request rate to one request per the requestDelay
time span, but allows the response to take as long as needed. To .ToArray()
call turns the IObservable<ResourceResponse<Document>>
that returns many values into an IObservable<ResourceResponse<Document>[]>
that returns a single array of values when the observable completes.
I couldn't test your code, so I tested a sample which I think simulates your code:
var r = new Random();
var a = Enumerable.Range(0, 1000);
var i = Observable.Interval(TimeSpan.FromSeconds(2.0));
var sw = Stopwatch.StartNew();
var query =
i.Zip(a, (ii, aa) => aa)
.SelectMany(aa => Observable.Start(() =>
{
var x = sw.Elapsed.TotalMilliseconds;
Thread.Sleep(r.Next(0, 5000));
return x;
}))
.Select(x => new
{
started = x,
ended = sw.Elapsed.TotalMilliseconds
});
I got this kind of result which shows that the requests were throttled:
4026.2983 5259.7043
2030.1287 6940.2326
6027.0439 9664.1045
8027.9993 10207.0579
10028.1762 12301.4746
12028.3190 12711.4440
14040.7972 17433.1964
16040.9267 17574.5924
18041.0529 19077.5545
Upvotes: 1
Reputation: 2728
When getting a 429 (Request rate too large) the response tells you how long to wait. There is a header x-ms-retry-after. This has a value. Wait for that time period in ms.
catch (AggregateException ex) when (ex.InnerException is DocumentClientException)
{
DocumentClientException dce = (DocumentClientException)ex.InnerException;
switch ((int)dce.StatusCode)
{
case 429:
Thread.Sleep(dce.RetryAfter);
break;
default:
Console.WriteLine(" Failed: {0}", ex.InnerException.Message);
throw;
}
}
Upvotes: 2