Reputation: 32333
I need to introduce a retry policy to the workflow. Let's say there are 3 blocks that are connected in such a way:
var executionOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 };
var buffer = new BufferBlock<int>();
var processing = new TransformBlock<int, int>(..., executionOptions);
var send = new ActionBlock<int>(...);
So there is a buffer which accumulates data, then send it to the transform block that processes not more that 3 items at one time, and then the result send to the action block.
Potentially during processing the transform block transient errors are possible, and I want retry the block if the error is transient for several times.
I know that blocks generally are not retryable (delegates that passed into the blocks could be made retryable). And one of the options is to wrap the delegate passed to support retrying.
I also know that there is a very good library TransientFaultHandling.Core
that provides the retry mechanisms to transient faults. This is an excellent library but not in my case. If I wrap the delegate that is passed to the transform block into the RetryPolicy.ExecuteAsync
method, the message inside the transform block will be locked, and until retry either completes or fails, the transform block won't be able to receive a new message. Imagine, if all the 3 messages are entered into the retrying (let's say, the next retry attempt will be in 2 minutes) and fail, the transform block will be stuck until at least one message leave the transform block.
The only solution I see is to extend the TranformBlock
(actually, ITargetBlock
will be enough too), and do the retry manually (like from here):
try { return await transform(input); }
if( numRetries <= 0 ) throw;
else Task.Delay(timeout).ContinueWith(t => processing.Post(message));
} while( numRetries-- > 0 );
i.g. to put the message inside the transform block again with a delay, but in this case the retry context (number of retries left, etc.) also should be passed into this block. Sounds too complex...
Does anyone see a simpler approach to implement retry policy for a workflow block?
Upvotes: 12
Views: 1951
Reputation: 43554
Here are two methods CreateRetryTransformBlock
and CreateRetryActionBlock
that operate under these assumptions:
). MaxDegreeOfParallelism
, BoundedCapacity
, CancellationToken
and EnsureOrdered
, on top of the options related to the retry functionality. The implementation below uses a SemaphoreSlim
to control the level of concurrency between operations that are attempted for the first time, and previously faulted operations that are retried after their delay duration has elapsed.
public class RetryExecutionDataflowBlockOptions : ExecutionDataflowBlockOptions
/// <summary>The limit after which an item is returned as failed.</summary>
public int MaxAttemptsPerItem { get; set; } = 1;
/// <summary>The delay duration before retrying an item.</summary>
public TimeSpan RetryDelay { get; set; } = TimeSpan.Zero;
/// <summary>The limit after which the block transitions to a faulted
/// state (unlimited is the default).</summary>
public int MaxRetriesTotal { get; set; } = -1;
public readonly struct RetryResult<TInput, TOutput>
public readonly TInput Input { get; }
public readonly TOutput Output { get; }
public readonly bool Success { get; }
public readonly Exception[] Exceptions { get; }
public bool Failed => !Success;
public Exception FirstException => Exceptions != null ? Exceptions[0] : null;
public int Attempts =>
Exceptions != null ? Exceptions.Length + (Success ? 1 : 0) : 1;
public RetryResult(TInput input, TOutput output, bool success,
Exception[] exceptions)
Input = input;
Output = output;
Success = success;
Exceptions = exceptions;
public class RetryLimitException : Exception
public RetryLimitException(string message, Exception innerException)
: base(message, innerException) { }
public static IPropagatorBlock<TInput, RetryResult<TInput, TOutput>>
CreateRetryTransformBlock<TInput, TOutput>(
Func<TInput, Task<TOutput>> transform,
RetryExecutionDataflowBlockOptions dataflowBlockOptions)
if (transform == null) throw new ArgumentNullException(nameof(transform));
if (dataflowBlockOptions == null)
throw new ArgumentNullException(nameof(dataflowBlockOptions));
int maxAttemptsPerItem = dataflowBlockOptions.MaxAttemptsPerItem;
int maxRetriesTotal = dataflowBlockOptions.MaxRetriesTotal;
TimeSpan retryDelay = dataflowBlockOptions.RetryDelay;
if (maxAttemptsPerItem < 1) throw new ArgumentOutOfRangeException(
if (maxRetriesTotal < -1) throw new ArgumentOutOfRangeException(
if (retryDelay < TimeSpan.Zero) throw new ArgumentOutOfRangeException(
var cancellationToken = dataflowBlockOptions.CancellationToken;
var exceptionsCount = 0;
var semaphore = new SemaphoreSlim(
async Task<(TOutput, Exception)> ProcessOnceAsync(TInput item)
await semaphore.WaitAsync(); // Preserve the SynchronizationContext
var result = await transform(item).ConfigureAwait(false);
return (result, null);
catch (Exception ex)
if (maxRetriesTotal != -1)
if (Interlocked.Increment(ref exceptionsCount) > maxRetriesTotal)
throw new RetryLimitException($"The max retry limit " +
$"({maxRetriesTotal}) has been reached.", ex);
return (default, ex);
async Task<Task<RetryResult<TInput, TOutput>>> ProcessWithRetryAsync(
TInput item)
// Creates a two-stages operation. Preserves the context on every await.
var (result, firstException) = await ProcessOnceAsync(item);
if (firstException == null) return Task.FromResult(
new RetryResult<TInput, TOutput>(item, result, true, null));
return RetryStageAsync();
async Task<RetryResult<TInput, TOutput>> RetryStageAsync()
var exceptions = new List<Exception>();
for (int i = 2; i <= maxAttemptsPerItem; i++)
await Task.Delay(retryDelay, cancellationToken);
var (result, exception) = await ProcessOnceAsync(item);
if (exception != null)
return new RetryResult<TInput, TOutput>(item, result,
true, exceptions.ToArray());
return new RetryResult<TInput, TOutput>(item, default, false,
// The input block awaits the first stage of each operation
var input = new TransformBlock<TInput, Task<RetryResult<TInput, TOutput>>>(
item => ProcessWithRetryAsync(item), dataflowBlockOptions);
// The output block awaits the second (and final) stage of each operation
var output = new TransformBlock<Task<RetryResult<TInput, TOutput>>,
RetryResult<TInput, TOutput>>(t => t, dataflowBlockOptions);
input.LinkTo(output, new DataflowLinkOptions { PropagateCompletion = true });
// In case of failure ensure that the input block is faulted too,
// so that its input/output queues are emptied, and any pending
// SendAsync operations are aborted
PropagateFailure(output, input);
return DataflowBlock.Encapsulate(input, output);
async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2)
try { await block1.Completion.ConfigureAwait(false); }
catch (Exception ex) { block2.Fault(ex); }
public static ITargetBlock<TInput> CreateRetryActionBlock<TInput>(
Func<TInput, Task> action,
RetryExecutionDataflowBlockOptions dataflowBlockOptions)
if (action == null) throw new ArgumentNullException(nameof(action));
var block = CreateRetryTransformBlock<TInput, object>(async input =>
await action(input).ConfigureAwait(false); return null;
}, dataflowBlockOptions);
var nullTarget = DataflowBlock.NullTarget<RetryResult<TInput, object>>();
return block;
Upvotes: 2
Reputation: 456677
In addition to svick's excellent answer, there are a couple of other options:
- just set MaxDegreeOfParallelism
to Unbounded
so the other messages can get through.LinkTo
that examines whether another retry is necessary. This approach is more complex; you'd have to add a delay to your block if it is doing a retry, and add a TransformBlock
to remove the failure/retry information for the rest of the mesh.Upvotes: 3
Reputation: 244848
I think you pretty much have to do that, you have to track the remaining number of retries for a message and you have to schedule the retried attempt somehow.
But you could make this better by encapsulating it in a separate method. Something like:
// it's a private class, so public fields are okay
private class RetryingMessage<T>
public T Data;
public int RetriesRemaining;
public readonly List<Exception> Exceptions = new List<Exception>();
public static IPropagatorBlock<TInput, TOutput>
CreateRetryingBlock<TInput, TOutput>(
Func<TInput, Task<TOutput>> transform, int numberOfRetries,
TimeSpan retryDelay, Action<IEnumerable<Exception>> failureHandler)
var source = new TransformBlock<TInput, RetryingMessage<TInput>>(
input => new RetryingMessage<TInput>
{ Data = input, RetriesRemaining = numberOfRetries });
// TransformManyBlock, so that we can propagate zero results on failure
TransformManyBlock<RetryingMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryingMessage<TInput>, TOutput>(
async message =>
return new[] { await transform(message.Data) };
catch (Exception ex)
if (message.RetriesRemaining == 0)
.ContinueWith(_ => target.Post(message));
return null;
target, new DataflowLinkOptions { PropagateCompletion = true });
return DataflowBlock.Encapsulate(source, target);
I have added code to track the exceptions, because I think that failures should not be ignored, they should be at the very least logged.
Also, this code doesn't work very well with completion: if there are retries waiting for their delay and you Complete()
the block, it will immediately complete and the retries will be lost. If that's a problem for you, you will have to track outstanding reties and complete target
when source
completes and no retries are waiting.
Upvotes: 16