Reputation: 5127
We are using Azure CosmosDB
with MongoDB API
+ MongoDB
C#
driver. Microsoft's office SDK works only with CosmosDB SQL API (DocumentDB) and has an built-in ability to retry write operations. For this it uses DocumentClientException.RetryAfter
property as a delay. However with Mongo
driver this is not possible. `It only returns error as:
Command insert failed: Message: {"Errors":["Request rate is large"]}.
Currently we have put 1 second retry delay when an write exception occurs, however this is not very clean and causes fixed amount of delays per bulk write operations. It is not real time meaning sometimes we delay writing next batches of records longer than necessary.
How do we handle this?
Upvotes: 1
Views: 444
Reputation: 1286
In CosmosDB, when you're using a V3.2 MongoDB endpoint, then no retry information is returned and you basically have guess and/or use some sort of exponential backoff. The exceptions you have to handle in this scenario include: MongoCommandException
and MongoExecutionTimeoutException
In V3.6, MongoDB endpoint does return RetryAfterMs= information as part of either MongoBulkWriteException
or MongoWriteException
. Unfortunately, as far as I can tell, you have to extract this information from the error message as there is no property containing this value.
Note: This may be an issue in non-english systems as error messages could be translated.
Below is what I've implemented using Polly - both endpoint versions are supported and the V3.6 policy is backwards compatible with V3.2.
Obviously, you may wish to tweak some of the values based on your scenario.
public static class Policies
{
public const int HttpThrottleErrorCode = 429;
public const int HttpServiceIsUnavailable = 1;
public const int HttpOperationExceededTimeLimit = 50;
public const int RateLimitCode = 16500;
public const string RetryAfterToken = "RetryAfterMs=";
public const int MaxRetries = 10;
public static readonly int RetryAfterTokenLength = RetryAfterToken.Length;
private static readonly Random JitterSeed = new Random();
public static readonly IAsyncPolicy NoPolicy = Policy.NoOpAsync();
public static Func<int, TimeSpan> SleepDurationProviderWithJitter(double exponentialBackoffInSeconds, int maxBackoffTimeInSeconds) => retryAttempt
=> TimeSpan.FromSeconds(Math.Min(Math.Pow(exponentialBackoffInSeconds, retryAttempt), maxBackoffTimeInSeconds)) // exponential back-off: 2, 4, 8 etc
+ TimeSpan.FromMilliseconds(JitterSeed.Next(0, 1000)); // plus some jitter: up to 1 second
public static readonly Func<int, TimeSpan> DefaultSleepDurationProviderWithJitter =
SleepDurationProviderWithJitter(1.5, 23);
public static readonly IAsyncPolicy MongoCommandExceptionPolicy = Policy
.Handle<MongoCommandException>(e =>
{
if (e.Code != RateLimitCode || !(e.Result is BsonDocument bsonDocument))
{
return false;
}
if (bsonDocument.TryGetValue("StatusCode", out var statusCode) && statusCode.IsInt32)
{
switch (statusCode.AsInt32)
{
case HttpThrottleErrorCode:
case HttpServiceIsUnavailable:
case HttpOperationExceededTimeLimit:
return true;
default:
return false;
}
}
if (bsonDocument.TryGetValue("IsValid", out var isValid) && isValid.IsBoolean)
{
return isValid.AsBoolean;
}
return true;
})
.WaitAndRetryAsync(
retryCount: MaxRetries,
DefaultSleepDurationProviderWithJitter
);
public static readonly IAsyncPolicy ExecutionTimeoutPolicy = Policy
.Handle<MongoExecutionTimeoutException>(e =>
e.Code == RateLimitCode || e.Code == HttpOperationExceededTimeLimit
)
.WaitAndRetryAsync(
retryCount: MaxRetries,
DefaultSleepDurationProviderWithJitter
);
public static readonly IAsyncPolicy MongoWriteExceptionPolicy = Policy
.Handle<MongoWriteException>(e =>
{
return e.WriteError?.Code == RateLimitCode
|| (e.InnerException is MongoBulkWriteException bulkException &&
bulkException.WriteErrors.Any(error => error.Code == RateLimitCode));
})
.WaitAndRetryAsync(
retryCount: MaxRetries,
sleepDurationProvider: (retryAttempt, e, ctx) =>
{
var timeToWaitInMs = ExtractTimeToWait(e.Message);
if (!timeToWaitInMs.HasValue && e.InnerException != null)
{
timeToWaitInMs = ExtractTimeToWait(e.InnerException.Message);
}
return timeToWaitInMs ?? DefaultSleepDurationProviderWithJitter(retryAttempt);
},
onRetryAsync: (e, ts, i, ctx) => Task.CompletedTask
);
public static readonly IAsyncPolicy MongoBulkWriteExceptionPolicy = Policy
.Handle<MongoBulkWriteException>(e =>
{
return e.WriteErrors.Any(error => error.Code == RateLimitCode);
})
.WaitAndRetryAsync(
retryCount: MaxRetries,
sleepDurationProvider: (retryAttempt, e, ctx) =>
{
var timeToWaitInMs = ExtractTimeToWait(e.Message);
return timeToWaitInMs ?? DefaultSleepDurationProviderWithJitter(retryAttempt);
},
onRetryAsync: (e, ts, i, ctx) => Task.CompletedTask
);
/// <summary>
/// It doesn't seem like RetryAfterMs is a property value - so unfortunately, we have to extract it from a string... (crazy??!)
/// </summary>
private static TimeSpan? ExtractTimeToWait(string messageToParse)
{
var retryPos = messageToParse.IndexOf(RetryAfterToken, StringComparison.OrdinalIgnoreCase);
if (retryPos >= 0)
{
retryPos += RetryAfterTokenLength;
var endPos = messageToParse.IndexOf(',', retryPos);
if (endPos > 0)
{
var timeToWaitInMsString = messageToParse.Substring(retryPos, endPos - retryPos);
if (Int32.TryParse(timeToWaitInMsString, out int timeToWaitInMs))
{
return TimeSpan.FromMilliseconds(timeToWaitInMs)
+ TimeSpan.FromMilliseconds(JitterSeed.Next(100, 1000));
}
}
}
return default;
}
/// <summary>
/// Use this policy if your CosmosDB MongoDB endpoint is V3.2
/// </summary>
public static readonly IAsyncPolicy DefaultPolicyForMongo3_2 = Policy.WrapAsync(MongoCommandExceptionPolicy, ExecutionTimeoutPolicy);
/// <summary>
/// Use this policy if your CosmosDB MongoDB endpoint is V3.6 or V3.2
/// </summary>
public static readonly IAsyncPolicy DefaultPolicyForMongo3_6 = Policy.WrapAsync(MongoCommandExceptionPolicy, ExecutionTimeoutPolicy, MongoWriteExceptionPolicy, MongoBulkWriteExceptionPolicy);
}
public static IAsyncPolicy DefaultPolicy { get; set; } = Policies.DefaultPolicyForMongo3_6;
Upvotes: 1