Reputation: 358
I have a a computation that I'm parallelizing using PLINQ as follows:
Source IEnumerable<T> source
is providing objects read from a
file.
I have a heavyweight computation HeavyComputation
I need to do on
each T
, and I want these farmed out across threads, so I am
using PLINQ like: AsParallel().Select(HeavyComputation)
Here's where it gets interesting: due to constraints on the file
reader type that provides the source
, I need source
to be
enumerated on the initial thread, not on the parallel workers. I need
the full evaluation of the source
to be bound to the main
thread. However it seems the source is actually enumerated on worker
threads.
My question is: Is there a straightforward way to modify this code to
bind the enumeration of the source
to the initial thread, while
farming out the heavy work to the parallel workers? Keep in mind that
just doing an eager .ToList()
before the AsParallel()
is not an option here,
as the data stream coming from the file is massive.
Here is some example code that demonstrates the problem as I see it:
using System.Threading;
using System.Collections.Generic;
using System.Linq;
using System;
public class PlinqTest
{
private static string FormatItems<T>(IEnumerable<T> source)
{
return String.Format("[{0}]", String.Join(";", source));
}
public static void Main()
{
var expectedThreadIds = new[] { Thread.CurrentThread.ManagedThreadId };
var threadIds = Enumerable.Range(1, 1000)
.Select(x => Thread.CurrentThread.ManagedThreadId) // (1)
.AsParallel()
.WithDegreeOfParallelism(8)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.AsOrdered()
.Select(x => x) // (2)
.ToArray();
// In the computation above, the lambda in (1) is a
// stand in for the file-reading operation that we
// want to be bound to the main thread, while the
// lambda in (2) is a stand-in for the "expensive
// computation" that we want to be farmed out to the
// parallel worker threads. In fact, (1) is being
// executed on all threads, as can be seen from the
// output.
Console.WriteLine("Expected thread IDs: {0}",
FormatItems(expectedThreadIds));
Console.WriteLine("Found thread IDs: {0}",
FormatItems(threadIds.Distinct()));
}
}
Example output I get is:
Expected thread IDs: [1]
Found thread IDs: [7;4;8;6;11;5;10;9]
Upvotes: 4
Views: 373
Reputation: 43485
You could use the OffloadQueryEnumeration
method below, which ensures that the enumeration of the source sequence will occur on the same thread that enumerates the resulting IEnumerable<TResult>
. The querySelector
is a delegate that converts a proxy of the source sequence to a ParallelQuery<T>
. This query is enumerated internally on a ThreadPool
thread, but the output values are surfaced back on the current thread.
/// <summary>
/// Enumerates the source sequence on the current thread, and enumerates
/// the projected query on a ThreadPool thread.
/// </summary>
public static IEnumerable<TResult> OffloadQueryEnumeration<TSource, TResult>(
this IEnumerable<TSource> source,
Func<IEnumerable<TSource>, IEnumerable<TResult>> querySelector)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(querySelector);
object locker = new();
(TSource Value, bool HasValue) input = default; bool inputCompleted = false;
(TResult Value, bool HasValue) output = default; bool outputCompleted = false;
using IEnumerator<TSource> sourceEnumerator = source.GetEnumerator();
IEnumerable<TSource> GetSourceProxy()
{
while (true)
{
TSource sourceItem;
lock (locker)
{
while (true)
{
if (inputCompleted || outputCompleted) yield break;
if (input.HasValue) break;
Monitor.Wait(locker);
}
sourceItem = input.Value;
input = default; Monitor.PulseAll(locker);
}
yield return sourceItem;
}
}
IEnumerable<TResult> query = querySelector(GetSourceProxy());
Task outputReaderTask = Task.Run(() =>
{
try
{
foreach (TResult result in query)
{
lock (locker)
{
while (true)
{
if (outputCompleted) return;
if (!output.HasValue) break;
Monitor.Wait(locker);
}
output = (result, true); Monitor.PulseAll(locker);
}
}
}
finally
{
lock (locker) { outputCompleted = true; Monitor.PulseAll(locker); }
}
});
// Main loop
List<Exception> exceptions = new();
while (true)
{
TResult resultItem;
lock (locker)
{
// Inner loop
while (true)
{
if (output.HasValue)
{
resultItem = output.Value;
output = default; Monitor.PulseAll(locker);
goto yieldResult;
}
if (outputCompleted) goto exitMainLoop;
if (!inputCompleted && !input.HasValue)
{
// Fill the empty input slot, by reading the enumerator.
try
{
if (sourceEnumerator.MoveNext())
input = (sourceEnumerator.Current, true);
else
inputCompleted = true;
}
catch (Exception ex)
{
exceptions.Add(ex);
inputCompleted = true;
}
Monitor.PulseAll(locker); continue;
}
Monitor.Wait(locker);
}
}
yieldResult:
bool yieldOK = false;
try { yield return resultItem; yieldOK = true; }
finally
{
if (!yieldOK)
{
// The consumer stopped enumerating prematurely
lock (locker) { outputCompleted = true; Monitor.PulseAll(locker); }
Task.WhenAny(outputReaderTask).Wait();
}
}
}
exitMainLoop:
// Propagate possible exceptions
try { outputReaderTask.GetAwaiter().GetResult(); }
catch (OperationCanceledException) { throw; }
catch (AggregateException aex) { exceptions.AddRange(aex.InnerExceptions); }
if (exceptions.Count > 0)
throw new AggregateException(exceptions);
}
This method uses the Monitor.Wait
/Monitor.Pulse
mechanism (tutorial), in order to synchronize the transfer of the values from the one thread to the other.
Usage example:
int[] threadIds = Enumerable
.Range(1, 1000)
.Select(x => Thread.CurrentThread.ManagedThreadId)
.OffloadQueryEnumeration(proxy => proxy
.AsParallel()
.AsOrdered()
.WithDegreeOfParallelism(8)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.Select(x => x)
)
.ToArray();
The OffloadQueryEnumeration
is a significantly intricate method. It is jungling non-stop three threads:
source
sequence, and consumes the PLINQ-generated elements, alternating between the two operations.ThreadPool
thread (outputReaderTask
) that enumerates the PLINQ-generated sequence.GetSourceProxy()
iterator. This thread is not the same all the time, but at any given moment only one worker thread at most is assigned this task.So lots of things are going on, and there are lots of opportunities for hidden bugs to pass undetected. This is the kind of API that would require writing a dozen of tests, to assert the correctness of the numerous possible scenarios (for example failure in the source sequence, failure in the PLINQ operators, failure in the consumer, cancellation, abandoned enumeration etc). I have tested manually some of these scenarios, but I haven't written any tests, so use this method with caution.
Upvotes: 0
Reputation:
This is fairly straightforward (although perhaps not as concise) if you abandon PLINQ and just use the Task Parallel Library explicitly:
// Limits the parallelism of the "expensive task"
var semaphore = new SemaphoreSlim(8);
var tasks = Enumerable.Range(1, 1000)
.Select(x => Thread.CurrentThread.ManagedThreadId)
.Select(async x =>
{
await semaphore.WaitAsync();
var result = await Task.Run(() => Tuple.Create(x, Thread.CurrentThread.ManagedThreadId));
semaphore.Release();
return result;
});
return Task.WhenAll(tasks).Result;
Note that I'm using Tuple.Create
to record both the thread ID coming from the main thread and the thread ID coming from the spawned task. From my test, the former is always the same for every tuple, while the latter varies, which is as it should be.
The semaphore makes sure that the degree of parallelism never goes above 8 (although with the inexpensive task of creating a tuple this isn't very likely anyway). If you get to 8, any new tasks will wait until there's spots available on the semaphore.
Upvotes: 2