David Alexander
David Alexander

Reputation: 358

Binding source thread in PLINQ

I have a a computation that I'm parallelizing using PLINQ as follows:

Here's where it gets interesting: due to constraints on the file reader type that provides the source, I need source to be enumerated on the initial thread, not on the parallel workers. I need the full evaluation of the source to be bound to the main thread. However it seems the source is actually enumerated on worker threads.

My question is: Is there a straightforward way to modify this code to bind the enumeration of the source to the initial thread, while farming out the heavy work to the parallel workers? Keep in mind that just doing an eager .ToList() before the AsParallel() is not an option here, as the data stream coming from the file is massive.

Here is some example code that demonstrates the problem as I see it:

using System.Threading;
using System.Collections.Generic;
using System.Linq;
using System;

public class PlinqTest
{
    private static string FormatItems<T>(IEnumerable<T> source)
    {
            return String.Format("[{0}]", String.Join(";", source));
    }

    public static void Main()
    {
        var expectedThreadIds = new[] { Thread.CurrentThread.ManagedThreadId };

        var threadIds = Enumerable.Range(1, 1000)
                .Select(x => Thread.CurrentThread.ManagedThreadId) // (1)
                .AsParallel()
                .WithDegreeOfParallelism(8)
                .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
                .AsOrdered()
                .Select(x => x)                                    // (2)
                .ToArray();

        // In the computation above, the lambda in (1) is a
        // stand in for the file-reading operation that we
        // want to be bound to the main thread, while the
        // lambda in (2) is a stand-in for the "expensive
        // computation" that we want to be farmed out to the
        // parallel worker threads.  In fact, (1) is being
        // executed on all threads, as can be seen from the
        // output.

        Console.WriteLine("Expected thread IDs: {0}",
                          FormatItems(expectedThreadIds));
        Console.WriteLine("Found thread IDs: {0}",
                          FormatItems(threadIds.Distinct()));
    }
}

Example output I get is:

Expected thread IDs: [1]
Found thread IDs: [7;4;8;6;11;5;10;9]

Upvotes: 4

Views: 373

Answers (2)

Theodor Zoulias
Theodor Zoulias

Reputation: 43485

You could use the OffloadQueryEnumeration method below, which ensures that the enumeration of the source sequence will occur on the same thread that enumerates the resulting IEnumerable<TResult>. The querySelector is a delegate that converts a proxy of the source sequence to a ParallelQuery<T>. This query is enumerated internally on a ThreadPool thread, but the output values are surfaced back on the current thread.

/// <summary>
/// Enumerates the source sequence on the current thread, and enumerates
/// the projected query on a ThreadPool thread.
/// </summary>
public static IEnumerable<TResult> OffloadQueryEnumeration<TSource, TResult>(
    this IEnumerable<TSource> source,
    Func<IEnumerable<TSource>, IEnumerable<TResult>> querySelector)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(querySelector);
    object locker = new();
    (TSource Value, bool HasValue) input = default; bool inputCompleted = false;
    (TResult Value, bool HasValue) output = default; bool outputCompleted = false;
    using IEnumerator<TSource> sourceEnumerator = source.GetEnumerator();

    IEnumerable<TSource> GetSourceProxy()
    {
        while (true)
        {
            TSource sourceItem;
            lock (locker)
            {
                while (true)
                {
                    if (inputCompleted || outputCompleted) yield break;
                    if (input.HasValue) break;
                    Monitor.Wait(locker);
                }
                sourceItem = input.Value;
                input = default; Monitor.PulseAll(locker);
            }
            yield return sourceItem;
        }
    }

    IEnumerable<TResult> query = querySelector(GetSourceProxy());
    Task outputReaderTask = Task.Run(() =>
    {
        try
        {
            foreach (TResult result in query)
            {
                lock (locker)
                {
                    while (true)
                    {
                        if (outputCompleted) return;
                        if (!output.HasValue) break;
                        Monitor.Wait(locker);
                    }
                    output = (result, true); Monitor.PulseAll(locker);
                }
            }
        }
        finally
        {
            lock (locker) { outputCompleted = true; Monitor.PulseAll(locker); }
        }
    });

    // Main loop
    List<Exception> exceptions = new();
    while (true)
    {
        TResult resultItem;
        lock (locker)
        {
            // Inner loop
            while (true)
            {
                if (output.HasValue)
                {
                    resultItem = output.Value;
                    output = default; Monitor.PulseAll(locker);
                    goto yieldResult;
                }
                if (outputCompleted) goto exitMainLoop;
                if (!inputCompleted && !input.HasValue)
                {
                    // Fill the empty input slot, by reading the enumerator.
                    try
                    {
                        if (sourceEnumerator.MoveNext())
                            input = (sourceEnumerator.Current, true);
                        else
                            inputCompleted = true;
                    }
                    catch (Exception ex)
                    {
                        exceptions.Add(ex);
                        inputCompleted = true;
                    }
                    Monitor.PulseAll(locker); continue;
                }
                Monitor.Wait(locker);
            }
        }
    yieldResult:
        bool yieldOK = false;
        try { yield return resultItem; yieldOK = true; }
        finally
        {
            if (!yieldOK)
            {
                // The consumer stopped enumerating prematurely
                lock (locker) { outputCompleted = true; Monitor.PulseAll(locker); }
                Task.WhenAny(outputReaderTask).Wait();
            }
        }
    }
exitMainLoop:

    // Propagate possible exceptions
    try { outputReaderTask.GetAwaiter().GetResult(); }
    catch (OperationCanceledException) { throw; }
    catch (AggregateException aex) { exceptions.AddRange(aex.InnerExceptions); }

    if (exceptions.Count > 0)
        throw new AggregateException(exceptions);
}

This method uses the Monitor.Wait/Monitor.Pulse mechanism (tutorial), in order to synchronize the transfer of the values from the one thread to the other.

Usage example:

int[] threadIds = Enumerable
    .Range(1, 1000)
    .Select(x => Thread.CurrentThread.ManagedThreadId)
    .OffloadQueryEnumeration(proxy => proxy
        .AsParallel()
        .AsOrdered()
        .WithDegreeOfParallelism(8)
        .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
        .Select(x => x)
    )
    .ToArray();

Online demo.

The OffloadQueryEnumeration is a significantly intricate method. It is jungling non-stop three threads:

  1. The current thread that both enumerates the source sequence, and consumes the PLINQ-generated elements, alternating between the two operations.
  2. The ThreadPool thread (outputReaderTask) that enumerates the PLINQ-generated sequence.
  3. The worker thread that is tasked by the PLINQ machinery to fetch the next item from the GetSourceProxy() iterator. This thread is not the same all the time, but at any given moment only one worker thread at most is assigned this task.

So lots of things are going on, and there are lots of opportunities for hidden bugs to pass undetected. This is the kind of API that would require writing a dozen of tests, to assert the correctness of the numerous possible scenarios (for example failure in the source sequence, failure in the PLINQ operators, failure in the consumer, cancellation, abandoned enumeration etc). I have tested manually some of these scenarios, but I haven't written any tests, so use this method with caution.

Upvotes: 0

user1726343
user1726343

Reputation:

This is fairly straightforward (although perhaps not as concise) if you abandon PLINQ and just use the Task Parallel Library explicitly:

// Limits the parallelism of the "expensive task"
var semaphore = new SemaphoreSlim(8);

var tasks = Enumerable.Range(1, 1000)
    .Select(x => Thread.CurrentThread.ManagedThreadId)
    .Select(async x =>
    {
        await semaphore.WaitAsync();
        var result = await Task.Run(() => Tuple.Create(x, Thread.CurrentThread.ManagedThreadId));
        semaphore.Release();

        return result;
    });

return Task.WhenAll(tasks).Result;

Note that I'm using Tuple.Create to record both the thread ID coming from the main thread and the thread ID coming from the spawned task. From my test, the former is always the same for every tuple, while the latter varies, which is as it should be.

The semaphore makes sure that the degree of parallelism never goes above 8 (although with the inexpensive task of creating a tuple this isn't very likely anyway). If you get to 8, any new tasks will wait until there's spots available on the semaphore.

Upvotes: 2

Related Questions