abatishchev
abatishchev

Reputation: 100248

How properly to throttle access to DocumentDb from WebJobs

I have an Azure WebKob with blob and queue triggers to save data to Azure DocumentDb.

From time to time I'm getting an error:

Microsoft.Azure.Documents.RequestRateTooLargeException: Message: {"Errors":["Request rate is large"]}

Currently I throttle requests using this code. A WebJob function:

public async Task ParseCategoriesFromCsv(...)
{
    double find = 2.23, add = 5.9, replace = 10.67;
    double requestCharge = Math.Round(find + Math.Max(add, replace));

    await categoryProvider.SaveCategories(requestCharge , categories);
}

Category provider to manipulate document db client:

public async Task<ResourceResponse<Document>[]> SaveCategories(double requestCharge, Category[] categories)
{
    var requestDelay = TimeSpan.FromSeconds(60.0 / (collectionOptions.RequestUnits / requestCharge));

    var scheduler = new IntervalTaskScheduler(requestDelay, Scheduler.Default); // Rx

    var client = new DocumentClient(endpoint, authorizationKey,
        new ConnectionPolicy
        {
            ConnectionMode = documentDbOptions.ConnectionMode,
            ConnectionProtocol = documentDbOptions.ConnectionProtocol
        });

    return await Task.WhenAll(documents.Select(async d =>
       await scheduler.ScheduleTask(
           () => client.PutDocumentToDb(collectionOptions.CollectionLink, d.SearchIndex, d))));
}

Task scheduler to throttle/measure/synchronize requests:

private readonly Subject<Action> _requests = new Subject<Action>();
private readonly IDisposable _observable;

public IntervalTaskScheduler(TimeSpan requestDelay, IScheduler scheduler)
{
    _observable = _requests.Select(i => Observable.Empty<Action>()
                                                  .Delay(requestDelay)
                                                  .StartWith(i))
                           .Concat()
                           .ObserveOn(scheduler)
                           .Subscribe(action => action());
}

public Task<T> ScheduleTask<T>(Func<Task<T>> request)
{
    var tcs = new TaskCompletionSource<T>();
    _requests.OnNext(async () =>
    {
        try
        {
            T result = await request();
            tcs.SetResult(result);
        }
        catch (Exception ex)
        {
            tcs.SetException(ex);
        }
    });
    return tcs.Task;
}

So it's basically a number of constants from ResourceResponse<Document>.RequestCharge but:

What a throttling/measuring/synchronization mechanism could work here well?

Upvotes: 1

Views: 1369

Answers (3)

Rajesh Nagpal
Rajesh Nagpal

Reputation: 1118

Starting .NET SDK 1.8.0, we automatically handle the Request Rate too large exceptions to a reasonable extent(will retry 9 times by default and honor the retry after returned from server for next retry).

In case you need greater control, you can configure the RetryOptions on the ConnectionPolicy instance that you pass in to the DocumentClient objectand we would override the default retry policy with it.

So you no longer need to add any custom logic for handling 429 exceptions in your application code like above.

Upvotes: 2

Enigmativity
Enigmativity

Reputation: 117027

It seems to me that you should be able to do this with your SaveCategories method to make it work nicely with Rx:

public IObservable<ResourceResponse<Document>[]> SaveCategories(double requestCharge, Category[] categories)
{
    var requestDelay = TimeSpan.FromSeconds(60.0 / (collectionOptions.RequestUnits / requestCharge));

    var client = new DocumentClient(endpoint, authorizationKey,
        new ConnectionPolicy
        {
            ConnectionMode = documentDbOptions.ConnectionMode,
            ConnectionProtocol = documentDbOptions.ConnectionProtocol
        });

    return
        Observable.Interval(requestDelay)
            .Zip(documents, (delay, doc) => doc)
            .SelectMany(doc => Observable.FromAsync(() => client.PutDocumentToDb(collectionOptions.CollectionLink, doc.SearchIndex, doc)))
            .ToArray();
}

This totally gets rid of your IntervalTaskScheduler class and ensures that you limit the request rate to one request per the requestDelay time span, but allows the response to take as long as needed. To .ToArray() call turns the IObservable<ResourceResponse<Document>> that returns many values into an IObservable<ResourceResponse<Document>[]> that returns a single array of values when the observable completes.

I couldn't test your code, so I tested a sample which I think simulates your code:

var r = new Random();
var a = Enumerable.Range(0, 1000);
var i = Observable.Interval(TimeSpan.FromSeconds(2.0));

var sw = Stopwatch.StartNew();

var query =
    i.Zip(a, (ii, aa) => aa)
        .SelectMany(aa => Observable.Start(() =>
        {
            var x = sw.Elapsed.TotalMilliseconds;
            Thread.Sleep(r.Next(0, 5000));
            return x;
        }))
        .Select(x => new
        {
            started = x,
            ended = sw.Elapsed.TotalMilliseconds
        });

I got this kind of result which shows that the requests were throttled:

 4026.2983  5259.7043 
 2030.1287  6940.2326 
 6027.0439  9664.1045 
 8027.9993 10207.0579 
10028.1762 12301.4746 
12028.3190 12711.4440 
14040.7972 17433.1964 
16040.9267 17574.5924 
18041.0529 19077.5545 

Upvotes: 1

Ryan CrawCour
Ryan CrawCour

Reputation: 2728

When getting a 429 (Request rate too large) the response tells you how long to wait. There is a header x-ms-retry-after. This has a value. Wait for that time period in ms.

catch (AggregateException ex) when (ex.InnerException is DocumentClientException)
{
    DocumentClientException dce = (DocumentClientException)ex.InnerException;
    switch ((int)dce.StatusCode)
    {
        case 429:
            Thread.Sleep(dce.RetryAfter);
            break;

         default:
             Console.WriteLine("  Failed: {0}", ex.InnerException.Message);
             throw;
     }                    
}

Upvotes: 2

Related Questions