Dima
Dima

Reputation: 1

TPL DataFlow proper way to handle exceptions

I have problem in a windows service which is using TPL DataFlow to manage a queue (database) and redirects work to a grid computing service. And at one point BufferBlock stops releasing tasks, and I am not sure why. I think it's because some exceptions happen during execution of some tasks, but they get suppressed and it's difficult to understand at which point BufferBlock stops accepting new tasks.

I tried to simplify it in the working example below. It doesn't have any exception handling and I and wondering how to properly handle exceptions in TPL. I found something similar here TPL Dataflow, guarantee completion only when ALL source data blocks completed. In this example I have 100 requests, and process data in batches with 10 requests. Emulating some exception which happens if ID % 9 == 0 If I don't catch this exception, it works a bit and then stops accepting new requests. If I handle and return Result.Failure it works fine I believe, but I'm not sure if it's a proper way to have it in production environment.

I'm new to TPL, forget me if I didn't explain more clearly my question. GitHub Project

Image Empty Slots

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Timers;
using CSharpFunctionalExtensions;

namespace TestTPL
{
    public class ServicePipeline
    {
        public const int batches = 100;
        private int currentBatch = 0;

        public ServicePipeline(int maxRequestsInParallel)
        {
            MaxRequestsInParallel = maxRequestsInParallel;
        }

        public int MaxRequestsInParallel { get; }
        public BufferBlock<MyData> QueueBlock { get; private set; }
        public List<TransformBlock<MyData, Result>> ExecutionBlocks
            { get; private set; }
        public ActionBlock<Result> ResultBlock { get; private set; }

        private void Init()
        {
            QueueBlock = new BufferBlock<MyData>(new DataflowBlockOptions()
                { BoundedCapacity = MaxRequestsInParallel });
            ExecutionBlocks = new List<TransformBlock<MyData, Result>>();
            ResultBlock = new ActionBlock<Result>(_ => _.OnFailure(
                () => Console.WriteLine($"Error: {_.Error}")));

            for (int blockIndex = 0; blockIndex < MaxRequestsInParallel;
                blockIndex++)
            {
                var executionBlock = new TransformBlock<MyData, Result>((d) =>
                {
                    return ExecuteAsync(d);
                }, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
                executionBlock.LinkTo(ResultBlock, new DataflowLinkOptions()
                    { PropagateCompletion = true });
                QueueBlock.LinkTo(executionBlock, new DataflowLinkOptions()
                    { PropagateCompletion = true });
                ExecutionBlocks.Add(executionBlock);
            }
        }

        public static Result ExecuteAsync(MyData myData)
        {
            //try
            //{
            WebClient web = new WebClient();
            TaskCompletionSource<Result> res = new TaskCompletionSource<Result>();
            Task task = Task<Result>.Run(() => web.DownloadStringAsync(
                new Uri("http://localhost:49182/Slow.ashx")));
            task.Wait();
            Console.WriteLine($"Data = {myData}");
            if (myData != null && myData.Id % 9 == 0)
                throw new Exception("Test");
            return Result.Ok();
            //}
            //catch (Exception ex)
            //{
            //    return Result.Failure($"Exception: {ex.Message}");
            //}
        }

        public async void Start()
        {
            Init();
            while (currentBatch < batches)
            {
                Thread.Sleep(1000);
                await SubmitNextRequests();
            }
            Console.WriteLine($"Completed: {batches}");
        }

        private async Task<int> SubmitNextRequests()
        {
            var emptySlots = MaxRequestsInParallel - QueueBlock.Count;
            Console.WriteLine($"Empty slots: {emptySlots}" +
                $", left = {batches - currentBatch}");
            if (emptySlots > 0)
            {
                var dataRequests = await GetNextRequests(emptySlots);
                foreach (var data in dataRequests)
                {
                    await QueueBlock.SendAsync(data);
                }
            }
            return emptySlots;
        }

        private async Task<List<MyData>> GetNextRequests(int request)
        {
            MyData[] myDatas = new MyData[request];
            Task<List<MyData>> task = Task<List<MyData>>.Run(() =>
            {
                for (int i = 0; i < request; i++)
                {
                    myDatas[i++] = new MyData(currentBatch);
                    currentBatch++;
                }
                return new List<MyData>(myDatas);
            });
            return await task;
        }
    }

    public class MyData
    {
        public int Id { get; set; }
        public MyData(int id) => Id = id;
        public override string ToString() { return Id.ToString(); }
    }
}

EDIT: 10/30/2019 It works as expected when the exception is handled and called explicitly Result.Failure($"Exception: {ex.Message}");

    public static Result ExecuteAsync(MyData myData)
    {
        try
        {
            WebClient web = new WebClient();
            TaskCompletionSource<Result> res = new TaskCompletionSource<Result>();
            Task task = Task<Result>.Run(() => Thread.Sleep(2000));
            task.Wait();
            Console.WriteLine($"Data = {myData}");
            if (myData != null && myData.Id % 9 == 0)
                throw new Exception("Test");
            return Result.Ok();
        }
        catch (Exception ex)
        {
            return Result.Failure($"Exception: {ex.Message}");
        }
    }

Upvotes: 0

Views: 1540

Answers (1)

Theodor Zoulias
Theodor Zoulias

Reputation: 43545

When linking two block, there is an option to propagate completion forward, but not backward. This becomes a problem when the BoundedCapacity option is used, and an error occurs, because it can block the feeder of the pipeline and cause a dead-lock. It is quite easy to propagate completion manually though. Here is a method that you can use.

async void OnErrorComplete(IDataflowBlock block1, IDataflowBlock block2)
{
    await Task.WhenAny(block1.Completion); // Safe awaiting
    if (block1.Completion.IsFaulted) block2.Complete();
}

It waits asynchronously for block1 to complete, and if it has failed it completes immediately the block2. Completing the upstream block is usually enough, but you can also propagate the specific exception if you want:

async void OnErrorPropagate(IDataflowBlock block1, IDataflowBlock block2)
{
    await Task.WhenAny(block1.Completion); // Safe awaiting
    if (block1.Completion.IsFaulted)
        block2.Fault(block1.Completion.Exception.InnerException);
}

Upvotes: 2

Related Questions