Eric Jorgensen
Eric Jorgensen

Reputation: 2054

Parallel.ForEach hangs after 3 yield returns?

I'm hitting a problem where the action in Parallel.ForEach is sometimes not called. I've created a simple toy program that shows the problem in a single-threaded case:

class Program
{
    static void Main(string[] args) 
    {
        // Any value > 3 here causes Parallel.ForEach to hang on the yield return
        int workCount = 4;
        bool inProcess = false;

        System.Collections.Generic.IEnumerable<int> getWorkItems()
        {
            while (workCount > 0)
            {
                if (!inProcess)
                {
                    inProcess = true;
                    System.Console.WriteLine($"    Returning work: {workCount}");
                    yield return workCount;
                }
            }
        }

        System.Threading.Tasks.Parallel.ForEach(getWorkItems(),
            new System.Threading.Tasks.ParallelOptions { MaxDegreeOfParallelism = 1 },
            (workItem) =>
            {
                System.Console.WriteLine($"      Parallel start:  {workItem}");
                workCount--;
                System.Console.WriteLine($"      Parallel finish: {workItem}");
                inProcess = false;
            });

        System.Console.WriteLine($"=================== Finished ===================\r\n");
    }
}

The output of this program is:

Returning work: 4
  Parallel start:  4
  Parallel finish: 4
Returning work: 3
  Parallel start:  3
  Parallel finish: 3
Returning work: 2
  Parallel start:  2
  Parallel finish: 2
Returning work: 1

... and it hangs right there. The action is never called for 1. What is going on here?

--------------------------- EDIT: More detailed example -----------------------

Here is the same program with more detailed output plus some locks to protect the shared values:

static object lockOnMe = new object();
static void Run()
{
    System.Console.WriteLine($"Starting ThreadId: {Thread.CurrentThread.ManagedThreadId}");

    // Any value > 3 here causes Parallel.ForEach to hang on the yield return
    int workCount = 40;
    bool inProcess = false;

    System.Collections.Generic.IEnumerable<int> getWorkItems()
    {
        while (workCount > 0)
        {
            lock(lockOnMe)
            {
                if (!inProcess)
                {
                    inProcess = true;
                    System.Console.WriteLine($"    Returning work: {workCount} ThreadId: {Thread.CurrentThread.ManagedThreadId}");
                    yield return workCount;
                }
            }

            Thread.Sleep(100);
            System.Console.Write($".");
        }
    }

    System.Threading.Tasks.Parallel.ForEach(getWorkItems(),
    new System.Threading.Tasks.ParallelOptions { MaxDegreeOfParallelism = 1 },
    (workItem) =>
    {
        lock(lockOnMe)
        {
            System.Console.WriteLine($"      Parallel start:  {workItem}  ThreadId: {Thread.CurrentThread.ManagedThreadId}");
            Interlocked.Decrement(ref workCount);
            System.Console.WriteLine($"      Parallel finish: {workItem}");
            inProcess = false;

        }
    });

    System.Console.WriteLine($"=================== Finished ===================\r\n");
}

output:

Starting ThreadId: 1
Returning work: 40 ThreadId: 1
  Parallel start:  40  ThreadId: 1
  Parallel finish: 40
Returning work: 39 ThreadId: 1
  Parallel start:  39  ThreadId: 1
  Parallel finish: 39
Returning work: 38 ThreadId: 1
  Parallel start:  38  ThreadId: 1
  Parallel finish: 38
Returning work: 37 ThreadId: 1
......................

Upvotes: 0

Views: 847

Answers (2)

rerun
rerun

Reputation: 25505

You are getting in the state that where You return work count on one thread after inProcess is set to false on another which puts you in an infinite loop. The parallelism guard is on the items returned from the enumerable not that the producer and consumers would be isolated. If you want this to work you will need to put a lock around everywhere you get or set in process or workcount;

If you upped the level of concurrency even the work counts could be wrong.

Edit

The reason this doesn't work is the default option Parallel.foreach uses to create the Partitioner allows for buffering. If you create the Partitioner yourself and disallow buffering this works as expected. Basically there is a heuristic in the Partitioner to run ahead and cache the returns of the IEnumerable which breaks the logic here.

If you want it to work as expected do the following.

    private static void Main(string[] args)
    {
        // Any value > 3 here causes Parallel.ForEach to hang on the yield return
        var partitioner = Partitioner.Create(getWorkItems(), EnumerablePartitionerOptions.NoBuffering);

        System.Threading.Tasks.Parallel.ForEach(partitioner,
            new System.Threading.Tasks.ParallelOptions { MaxDegreeOfParallelism = 1  },
            (workItem) =>
            {
                System.Console.WriteLine($"      Parallel start:  {workItem}");
                workCount--;
                System.Console.WriteLine($"      Parallel finish: {workItem}");
                inProcess = false;
            });

        System.Console.WriteLine($"=================== Finished ===================\r\n");
        var s = System.Console.ReadLine();
    }

Upvotes: 4

Eric Jorgensen
Eric Jorgensen

Reputation: 2054

Not a full answer, but here is what I've learned so far:

Parallel.ForEach does not behave like a traditional multithreaded approach, which would have no trouble sharing data as the example code does. Even when protecting the shared data to prevent concurrent access, there appears to be optimization happening on the ForEach logic that doesn't work well if parallel threads need to be interlocked (e.g.: processing objects in sequential order).

To get a little picture of the weirdness that is happening with Parallel.ForEach, try running this snippet:

static void Run()
{
    System.Collections.Generic.IEnumerable<int> getWorkItems()
    {
        int workCount = 9999;
        while (workCount > 0)
        {
            System.Console.Write($"R");
            yield return workCount--;
            Thread.Sleep(10);
        }
    }

    System.Threading.Tasks.Parallel.ForEach(
        getWorkItems(),
        (workItem) => System.Console.Write($"."));
}

output:

R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.R.
RR..RR..RR..RR..RR..RR..RR..RR..RR..RR..RR..RR..RR..RR..RR..RR..RR..RR..RR..RR
..RR..RR..RR..RR..RR..RR..RR..RR..RR..RR..RR..RR..RR..RR..RR..RR..RR..RR..RR..
RRRR....RRRR....RRRR....RRRR....RRRR....RRRR....RRRR....RRRR....RRRR....RRRR..
..RRRR....RRRR....RRRR....RRRR....RRRR....RRRR....RRRR....RRRR....RRRR....RRRR
....RRRR....RRRR....RRRR....RRRR....RRRR....RRRR....RRRR....RRRR....RRRR....RR
RR....RRRR....RRRR....RRRR....RRRR....RRRR....RRRR....RRRR....RRRR....RRRR....
RRRRRRRR........RRRRRRRR........RRRRRRRR........RRRRRRRR........RRRRRRRR......
..RRRRRRRR........RRRRRRRR........RRRRRRRR........RRRRRRRR........RRRRRRRR....
....RRRRRRRR........RRRRRRRR........R.RRRRRRRR........RRRRRRRR........RRRRRRRR
........RRRRRRRR........RRRRRRRR........RRRRRRRR........RRRRRRRR........RRRRRR
RR........RRRRRRRR........RRRRRRRR........RRRRRRRR........RRRRRRRR........RRRR
RRRR........R.RRRRRRRR........RRRRRRRR........RRRRRRRR........RRRRRRRR........
RRRRRRRR........RRRRRRRR........RRRRRRRR........RRRRRRRR........RRRRRRRR......
..RRRRRRRR........RRRRRRRR........RRRRRRRR........RRRRRRRR........R.RRRRRRRR..
......RRRRRRRRRRRR...

... and so on. I don't have an explanation for this behavior, but I am guessing that the handler of the interator is trying to optimize how much it buffers the input. In any event, this behavior causes havoc with code that is trying to synchronize on common objects.

Moral of the story: Use parallel.Foreach if your processing is cleanly parallel and does not require synchronization. If you need synchronization, try another approach such as pre-harvesting the data into an array, or writing your own multithreaded handler.

Upvotes: 1

Related Questions