Florian
Florian

Reputation: 4738

Parallel.Foreach + yield return?

I want to process something using parallel loop like this :

public void FillLogs(IEnumerable<IComputer> computers)
{
    Parallel.ForEach(computers, cpt=>
    {
        cpt.Logs = cpt.GetRawLogs().ToList();
    });

}

Ok, it works fine. But How to do if I want the FillLogs method return an IEnumerable ?

public IEnumerable<IComputer> FillLogs(IEnumerable<IComputer> computers)
{
    Parallel.ForEach(computers, cpt=>
    {
        cpt.Logs = cpt.GetRawLogs().ToList();
        yield return cpt // KO, don't work
    });

}

EDIT

It seems not to be possible... but I use something like this :

public IEnumerable<IComputer> FillLogs(IEnumerable<IComputer> computers)
{
    return computers.AsParallel().Select(cpt => cpt);
}

But where I put the cpt.Logs = cpt.GetRawLogs().ToList(); instruction

Upvotes: 38

Views: 19692

Answers (5)

Matas Vaitkevicius
Matas Vaitkevicius

Reputation: 61469

You can use the following extension method

public static class ParallelExtensions
{
    public static IEnumerable<T1> OrderedParallel<T, T1>(this IEnumerable<T> list, Func<T, T1> action)
    {
        var unorderedResult = new ConcurrentBag<(long, T1)>();
        Parallel.ForEach(list, (o, state, i) =>
        {
            unorderedResult.Add((i, action.Invoke(o)));
        });
        var ordered = unorderedResult.OrderBy(o => o.Item1);
        return ordered.Select(o => o.Item2);
    }
}

use like:

public void FillLogs(IEnumerable<IComputer> computers)
{
    cpt.Logs = computers.OrderedParallel(o => o.GetRawLogs()).ToList();
}

Hope this will save you some time.

Upvotes: 1

CSharpBender
CSharpBender

Reputation: 31

Although the question is old I've managed to do something just for fun.

class Program
{
    static void Main(string[] args)
    {
        foreach (var message in GetMessages())
        {
            Console.WriteLine(message);
        }
    }


    // Parallel yield
    private static IEnumerable<string> GetMessages()
    {
        int total = 0;
        bool completed = false;
        var batches = Enumerable.Range(1, 100).Select(i => new Computer() { Id = i });
        var qu = new ConcurrentQueue<Computer>();
        Task.Run(() =>
        {
            try
            {
                Parallel.ForEach(batches,
                    () => 0,
                    (item, loop, subtotal) =>
                    {
                        Thread.Sleep(1000);
                        qu.Enqueue(item);
                        return subtotal + 1;
                    },
                    result => Interlocked.Add(ref total, result));
            }
            finally
            {
                completed = true;
            }
        });

        int current = 0;
        while (current < total || !completed)
        {
            SpinWait.SpinUntil(() => current < total || completed);
            if (current == total) yield break;
            current++;
            qu.TryDequeue(out Computer computer);
            yield return $"Completed {computer.Id}";
        }
    }
}

public class Computer
{
    public int Id { get; set; }
}

Compared to Koray's answer this one really uses all the CPU cores.

Upvotes: 3

Koray
Koray

Reputation: 1806

How about

            Queue<string> qu = new Queue<string>();
            bool finished = false;
            Task.Factory.StartNew(() =>
            {
                Parallel.ForEach(get_list(), (item) =>
                {
                    string itemToReturn = heavyWorkOnItem(item);         
                    lock (qu)
                       qu.Enqueue(itemToReturn );                        
                });
                finished = true;
            });

            while (!finished)
            {
                lock (qu)
                    while (qu.Count > 0)
                        yield return qu.Dequeue();
                //maybe a thread sleep here?
            }

Edit: I think this is better:

        public static IEnumerable<TOutput> ParallelYieldReturn<TSource, TOutput>(this IEnumerable<TSource> source, Func<TSource, TOutput> func)
        {
            ConcurrentQueue<TOutput> qu = new ConcurrentQueue<TOutput>();
            bool finished = false;
            AutoResetEvent re = new AutoResetEvent(false);
            Task.Factory.StartNew(() =>
            {
                Parallel.ForEach(source, (item) =>
                {
                    qu.Enqueue(func(item));
                    re.Set();
                });
                finished = true;
                re.Set();
            });

            while (!finished)
            {
                re.WaitOne();
                while (qu.Count > 0)
                {
                    TOutput res;
                    if (qu.TryDequeue(out res))
                        yield return res;
                }
            }
        }   

Edit2: I agree with the short No answer. This code is useless; you cannot break the yield loop.

Upvotes: 0

Fischermaen
Fischermaen

Reputation: 12468

I don't want to be offensive, but maybe there is a lack of understanding. Parallel.ForEach means that the TPL will run the foreach according to the available hardware in several threads. But that means, that ii is possible to do that work in parallel! yield return gives you the opportunity to get some values out of a list (or what-so-ever) and give them back one-by-one as they are needed. It prevents of the need to first find all items matching the condition and then iterate over them. That is indeed a performance advantage, but can't be done in parallel.

Upvotes: 5

Marc Gravell
Marc Gravell

Reputation: 1063619

Short version - no, that isn't possible via an iterator block; the longer version probably involves synchronized queue/dequeue between the caller's iterator thread (doing the dequeue) and the parallel workers (doing the enqueue); but as a side note - logs are usually IO-bound, and parallelising things that are IO-bound often doesn't work very well.

If the caller is going to take some time to consume each, then there may be some merit to an approach that only processes one log at a time, but can do that while the caller is consuming the previous log; i.e. it begins a Task for the next item before the yield, and waits for completion after the yield... but that is again, pretty complex. As a simplified example:

static void Main()
{
    foreach(string s in Get())
    {
        Console.WriteLine(s);
    }
}

static IEnumerable<string> Get() {
    var source = new[] {1, 2, 3, 4, 5};
    Task<string> outstandingItem = null;
    Func<object, string> transform = x => ProcessItem((int) x);
    foreach(var item in source)
    {
        var tmp = outstandingItem;

        // note: passed in as "state", not captured, so not a foreach/capture bug
        outstandingItem = new Task<string>(transform, item);
        outstandingItem.Start();

        if (tmp != null) yield return tmp.Result;
    }
    if (outstandingItem != null) yield return outstandingItem.Result;
}
static string ProcessItem(int i)
{
    return i.ToString();
}

Upvotes: 18

Related Questions