Kaveh Shahbazian
Kaveh Shahbazian

Reputation: 13523

TPL Dataflow Blocks

Question: Why using a WriteOnceBlock (or BufferBlock) for getting back the answer (like sort of callback) from another BufferBlock<Action> (getting back the answer happens in that posted Action) causes a deadlock (in this code)?

I thought that methods in a class can be considered as messages that we are sending to the object (like the original point of view about OOP that was proposed by - I think - Alan Kay). So I wrote this generic Actor class that helps to convert and ordinary object to an Actor (Of-course there are lots of unseen loopholes here because of mutability and things, but that's not the main concern here).

So we have these definitions:

public class Actor<T>
{
    private readonly T _processor;
    private readonly BufferBlock<Action<T>> _messageBox = new BufferBlock<Action<T>>();

    public Actor(T processor)
    {
        _processor = processor;
        Run();
    }

    public event Action<T> Send
    {
        add { _messageBox.Post(value); }
        remove { }
    }

    private async void Run()
    {
        while (true)
        {
            var action = await _messageBox.ReceiveAsync();
            action(_processor);
        }
    }
}

public interface IIdGenerator
{
    long Next();
}

Now; why this code works:

static void Main(string[] args)
{
    var idGenerator1 = new IdInt64();

    var idServer1 = new Actor<IIdGenerator>(idGenerator1);

    const int n = 1000;
    for (var i = 0; i < n; i++)
    {
        var t = new Task(() =>
        {
            var answer = new WriteOnceBlock<long>(null);

            Action<IIdGenerator> action = x =>
            {
                var buffer = x.Next();

                answer.Post(buffer);
            };

            idServer1.Send += action;

            Trace.WriteLine(answer.Receive());
        }, TaskCreationOptions.LongRunning); // Runs on a separate new thread
        t.Start();
    }

    Console.WriteLine("press any key you like! :)");
    Console.ReadKey();

    Trace.Flush();
}

And this code does not work:

static void Main(string[] args)
{
    var idGenerator1 = new IdInt64();

    var idServer1 = new Actor<IIdGenerator>(idGenerator1);

    const int n = 1000;
    for (var i = 0; i < n; i++)
    {
        var t = new Task(() =>
        {
            var answer = new WriteOnceBlock<long>(null);

            Action<IIdGenerator> action = x =>
            {
                var buffer = x.Next();

                answer.Post(buffer);
            };

            idServer1.Send += action;

            Trace.WriteLine(answer.Receive());
        }, TaskCreationOptions.PreferFairness); // Runs and is managed by Task Scheduler 
        t.Start();
    }

    Console.WriteLine("press any key you like! :)");
    Console.ReadKey();

    Trace.Flush();
}

Different TaskCreationOptions used here to create Tasks. Maybe I am wrong about TPL Dataflow concepts here, just started to use it (A [ThreadStatic] hidden somewhere?).

Upvotes: 2

Views: 1104

Answers (1)

i3arnon
i3arnon

Reputation: 116548

The problematic issue with your code is this part: answer.Receive(). When you move it inside the action the deadlock doesn't happen:

var t = new Task(() =>
{
    var answer = new WriteOnceBlock<long>(null);
    Action<IIdGenerator> action = x =>
    {
        var buffer = x.Next();
        answer.Post(buffer);
        Trace.WriteLine(answer.Receive());
    };
    idServer1.Send += action;
});
t.Start();

So why is that? answer.Receive();, as opposed to await answer.ReceiveAsnyc(); blocks the thread until an answer is returned. When you use TaskCreationOptions.LongRunning each task gets its own thread, so there's no problem, but without it (the TaskCreationOptions.PreferFairness is irrelevant) all the thread pool threads are busy waiting and so everything is much slower. It doesn't actually deadlock, as you can see when you use 15 instead of 1000.

There are other solutions that help understand the problem:

  • Increasing the thread pool with ThreadPool.SetMinThreads(1000, 0); before the original code.
  • Using ReceiveAsnyc:

Task.Run(async () =>
{
    var answer = new WriteOnceBlock<long>(null);
    Action<IIdGenerator> action = x =>
    {
        var buffer = x.Next();
        answer.Post(buffer);
    };
    idServer1.Send += action;          
    Trace.WriteLine(await answer.ReceiveAsync());
});

Upvotes: 3

Related Questions