Troy Berg
Troy Berg

Reputation: 201

Parallel.ForEach blocked on long iteration

I've been using Parallel.ForEach to do some time-consuming processing on collections of items. The processing is actually handled by an external command line tool and I cannot change that. However, it seems that the Parallel.ForEach will get "stuck" on a long running item from the collection. I've distilled the problem down and can show that Parallel.ForEach is, in fact, waiting for this long one to finish and not allowing any others through. I've written a console app to demonstrate the problem:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace testParallel
{
    class Program
    {
        static int inloop = 0;
        static int completed = 0;
        static void Main(string[] args)
        {
            // initialize an array integers to hold the wait duration (in milliseconds)
            var items = Enumerable.Repeat(10, 1000).ToArray();
            
            // set one of the items to 10 seconds
            items[50] = 10000;


            // Initialize our line for reporting status
            Console.Write(0.ToString("000") + " Threads, " + 0.ToString("000") + " completed");

            // Start the loop in a task (to avoid SO answers having to do with the Parallel.ForEach call, itself, not being parallel)
            var t = Task.Factory.StartNew(() => Process(items));

            // Wait for the operations to compelte
            t.Wait();

            // Report finished
            Console.WriteLine("\nDone!");
        }

        static void Process(int[] items)
        {
            // SpinWait (not sleep or yield or anything) for the specified duration
            Parallel.ForEach(items, (msToWait) =>
            {
                // increment the counter for how many threads are in the loop right now
                System.Threading.Interlocked.Increment(ref inloop);

                // determine at what time we shoule stop spinning
                var e = DateTime.Now + new TimeSpan(0, 0, 0, 0, msToWait);
                
                // spin until the target time
                while (DateTime.Now < e) /* no body -- just a hard loop */;
                
                // count another completed
                System.Threading.Interlocked.Increment(ref completed);

                // we're done with this iteration
                System.Threading.Interlocked.Decrement(ref inloop);

                // report status
                Console.Write("\r" + inloop.ToString("000") + " Threads, " + completed.ToString("000") + " completed");

            });
        }
    }
}

Basically, I make an array of int to store the number of milliseconds a given operation takes. I set them all to 10 except for one, which I set to 10000 (so, 10 seconds). I kick off the Parallel.ForEach in a task and process each integer in a hard spin wait (so it shouldn't be yielding or sleeping or anything). On each iteration, I report how many iterations are in the body of the loop right now, and how many iterations we have completed. Mostly, it goes along fine. However, toward the end (time-wise), it reports "001 Threads, 987 Completed".

My question is why doesn't it use 7 of the other cores to work on the remaining 13 "jobs"? This one long-running iteration should not keep it from processing other elements in the collection, right?

This example happens to be a fixed collection, but it could easily be set to be an enumerable. We wouldn't want to stop fetching the next item in the enumerable just because one was taking a long time.

Upvotes: 4

Views: 3134

Answers (1)

Troy Berg
Troy Berg

Reputation: 201

I found the answer (or at least, an answer). It has to do with the chunk partitioning. The SO answer here got it for me. So basically, at the top of my "Process" function, if I change from this:

        static void Process(int[] items)
        {
            Parallel.ForEach(items, (msToWait) => { ... });
        }

to this

        static void Process(int[] items)
        {
            var partitioner = Partitioner.Create(items, EnumerablePartitionerOptions.NoBuffering);
            Parallel.ForEach(partitioner, (msToWait) => { ... });
        }

it grabs the work one at a time. For the more typical case of a parallel for each, where the body doesn't take more than a second, I can certainly see chunking the sets of work. In my use case, however, each body part can take anywhere from half a second to 5 hours. I certainly would not want a bunch of the 10-second variety elements to be blocked by one 5 hour element. So, in this case, the overhead of "one-at-a-time" is well worth it.

Upvotes: 5

Related Questions