Kyle C
Kyle C

Reputation: 1637

Can I update the collection that Parallel.For is using?

I have a situation in which I am running some tasks that take a few seconds to minutes each. I also have the possibility that more data gets added that needs to be added to the already running Parallel loop. Is it possible to update the current collection that the Parallel.For is using and have it continue to iterate over this until there are no more objects to retrieve? Here's some example code showing my problem:

[Test]
public void DoesParallelForGetNewEntriesInLoop()
{
    ConcurrentDictionary<int, string> dict = new ConcurrentDictionary<int, string>();
    ConcurrentBag<string> bag = new ConcurrentBag<string>();
    int i = 0;
    // write to dictionary every 10ms simulating new additions
    Timer t = new Timer(callback =>
    {
        dict.TryAdd(i++, "Value" + i);
    }, dict, 0, 10);
    // Add initial values
    dict.TryAdd(i++, "Value" + i);
    dict.TryAdd(i++, "Value" + i);
    dict.TryAdd(i++, "Value" + i);

    Parallel.For(0, dict.Count, (a, state) =>
    {
        string val = string.Empty;
        if (dict.TryGetValue(a, out val))
        {
            bag.Add(val + Environment.NewLine);
        }
        if (i++ == 50)
            state.Stop();
        Thread.Sleep(5000);

    });
    foreach (var item in bag)
    {
        File.AppendAllText("parallelWrite.txt", item);
    }            
}

When I run this the result I get is simply:

Value2
Value1
Value3
Value4

Is there a better approach to doing what I am trying to do here?

Upvotes: 2

Views: 1303

Answers (2)

Michael Humelsine
Michael Humelsine

Reputation: 601

How about using BlockingCollection and calling GetConsumingEnumerable() in your Parallel.ForEach

BlockingCollection<string> collection = new BlockingCollection<string>();

Parallel.ForEach(collection.GetConsumingEnumerable(), (x) => Console.WriteLine(x));

You can add stuff to the collection using BlockingCollection's Add() method.

There is technically "double locking" going on since Parallel.ForEach locks the collection when it takes chunks of items from the enumerable to process and BlockingCollection was built to support multiple consumers so it also implements locking. If this becomes a performance concern (it very well could) then you can implement your own partitioner for your BlockingCollection since Parallel.ForEach has overloads that accept OrderablePartitioner and Partitioner. There is a very good article that describes how here: http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

Upvotes: 4

eladcon
eladcon

Reputation: 5825

The from and to parameters in the Parallel.Forgets computed only once before the loop starts. Use the Parallel.ForEach to iterate over new items. I'm not sure what are you trying to achieve but a better approach might be to put new data in a stack/queue and periodically pop out the data and handle it.

Upvotes: 0

Related Questions