Tomas C
Tomas C

Reputation: 37

PLINQ iteration of looping enumerator causes deadlock

I have a simple program that iterates over an endless enumerable implemented as a feedback enumerator. I have implemented this in both TPL and PLINQ. Both examples lockup after a predictable number of iterations: 8 for PLINQ, and 3 for TPL. It the code is executed without using TPL/PLINQ, it runs fine. I have implemented the enumerator in a non-threadsafe way as well as a threadsafe way. The former can be used if the degree of parallelism is restricted to one (as is the case in the examples). The non-threadsafe enumerator is very simple and does not rely on any 'fancy' .NET library classes. If I increase the degree of parallelism, the number of iterations that are performed before the deadlock increases, e.g for PLINQ the number of iterations is 8 * the degree of parallelism.

Here are the iterators:
Enumerator (non-threadsafe)

public class SimpleEnumerable<T>: IEnumerable<T>
{
    private T _value;
    private readonly AutoResetEvent _releaseValueEvent = new AutoResetEvent(false);

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public IEnumerator<T> GetEnumerator()
    {
        while(true)
        {
            _releaseValueEvent.WaitOne();
            yield return _value;
        }
    }

    public void OnNext(T value)
    {
        _value = value;
        _releaseValueEvent.Set();
    }
}

Enumerator (threadsafe)

public class SimpleEnumerable<T>: IEnumerable<T>
{
    private readonly BlockingCollection<T> _blockingCollection = new BlockingCollection<T>();

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public IEnumerator<T> GetEnumerator()
    {
        while(true)
        {
            yield return _blockingCollection.Take();
        }
    }

    public void OnNext(T value)
    {
        _blockingCollection.Add(value);
    }
}

PLINQ Example:

public static void Main(string[] args)
{
    var enumerable = new SimpleEnumerable<int>();
    enumerable.OnNext(0);

    enumerable
        .Do(i => Debug.WriteLine($"{i} {Thread.CurrentThread.ManagedThreadId}"))
        .AsParallel()
        .WithDegreeOfParallelism(1)
        .ForEach
        (
            i =>
            {
                Debug.WriteLine($"{i} {Thread.CurrentThread.ManagedThreadId}");
                enumerable.OnNext(i+1);
            }
        );
}

TPL Example:

public static void Main(string[] args)
{
    var enumerable = new SimpleEnumerable<int>();
    enumerable.OnNext(0);

    Parallel.ForEach
    (
        enumerable,
        new ParallelOptions { MaxDegreeOfParallelism = 1},
        i =>
        {
            Debug.WriteLine($"{i} {Thread.CurrentThread.ManagedThreadId}");
            enumerable.OnNext(i+1);
        }
    );
}

Base on my analysis of the callstack, it appears that there is a deadlock that occurs in a partitioner related method both in PLINQ and TPL, but I am not sure how to interpret this.

Through trial and error I have found the wrapping the PLINQ enumerable in Partitioner.Create(enumerable, EnumerablePartitionerOptions.NoBuffering) fixes the problem, but I am not sure why the deadlock occurs.

I would be very interested in finding out the root cause of the bug.

Note that this is a contrived example. I am not looking for a critique of the code, but rather why is the deadlock occurring. Specifically, in the PLINQ example, if the .AsParallel() and .WithDegreeOfParallelism(1) lines are commented out, the code works just fine.

Upvotes: 2

Views: 268

Answers (1)

Servy
Servy

Reputation: 203811

You don't actually have a logical sequence of values, so trying to create an IEnumerable in the first place simply doesn't make any sense. Additionally, you should almost certainly not be trying to create an IEnumerator that can be consumed by multiple threads. There lies madness, simply because the interface that IEnumerator exposes isn't really exposing what you'd want it to in order to do that. You can potentially create an IEnumerator that will only ever be used by a single thread that compute the data to return based on an underlying data source that is used by multiple threads though, as that's rather different.

If you simply want to create a producer and a consumer that run in different threads, don't create your own "wrapper" around BlockingCollection, *just use BlockingCollection. Have the producer add to it, and the consumer read from it. The consumer can use GetConsumingEnumerable if it just wants to iterate over items while taking those items (a common operation to want to do).

Upvotes: 2

Related Questions