Mario
Mario

Reputation: 14780

Task.Run loop with external parameters are changed by loop

I have this code (simplified) to process an array of parameters on 100 different parallel threads, but the variables x and y gets changed by the loop within the threads while are used in threads. If I run the function with 1 thread then it's working.

I also tried to put the parameters in a ConcurrentBag and making the loop with foreach but the same result, the parameters gets mixed in threads.

List<Task> tasks = new List<Task>();
var listConcurentBag = new ConcurrentBag<int>();
int nThreadCount = 0;

for (x=0; x<1000; x++)
  for (y=0; y<1000; y++)
  {
     int x1=x;
     int y2=y;

     Task t = Task.Run(() =>
     {         
        int param1=x1;
        int param2=y2;

        // some calculations with param1 and param2

        listConcurentBag.Add(result);
     }); // tasks

     tasks.Add(t);
     nThreadCount++;

   if (nThreadCount == 100) // after 100 threads started, wait
   {
       nThreadCount = 0;
       Task.WaitAll(tasks.ToArray());
       tasks.Clear();
   }
 }

Upvotes: 1

Views: 1098

Answers (2)

Enigmativity
Enigmativity

Reputation: 117084

You should use Microsoft's Reactive Framework (aka Rx) - NuGet System.Reactive and add using System.Reactive.Linq; - then you can do this:

var query =
    from x in Observable.Range(0, 1000)
    from y in Observable.Range(0, 1000)
    from r in Observable.Start(() => GetResult(x,  y))
    select new { x, y, r };

IDisposable subscription =
    query
        .Buffer(100)
        .Subscribe(results =>
        {
            /* do something with each buffered list of results */
        });

Now this isn't strictly doing the same as your current code, but it is giving you blocks of 100 results as soon as they are available using the maximum capacity of the thread-pool.

You could change it to set the concurrency like this:

var query =
    from x in Observable.Range(0, 1000)
    from y in Observable.Range(0, 1000)
    select Observable.Start(() => new { x, y, r = GetResult(x,  y) });

IDisposable subscription =
    query
        .Merge(maxConcurrent: 100) // limit to 100 threads
        .Buffer(count: 100) // produce 100 results at a time
        .Subscribe(results =>
        {
            /* do something with the list of results */
        });

If you want to stop the code before it naturally completes just call subscription.Dispose();.

Rx does tend to produce much cleaner code, IMHO.

Upvotes: 2

Theodor Zoulias
Theodor Zoulias

Reputation: 43553

I have a suggestion for an alternative implementation, that you may or may not find suitable for your needs. Instead of processing the tasks in batches of 100 you could express the nested-for loops as a single enumerable, and then feed it to the build-in method Parallel.ForEach to do the work of parallelism.

private IEnumerable<(int, int)> GetNestedFor()
{
    for (int x = 0; x < 1000; x++)
    {
        for (int y = 0; y < 1000; y++)
        {
            yield return (x, y); // return a ValueTuple<int, int>
        }
    }
}

ThreadPool.SetMinThreads(100, 100);
var options = new ParallelOptions() { MaxDegreeOfParallelism = 100 };
Parallel.ForEach(GetNestedFor(), options, item =>
{
    int param1 = item.Item1;
    int param2 = item.Item2;
    Console.WriteLine($"Processing ({param1}, {param2})");
    Thread.Sleep(100); // Simulate some work
});

Output:

Processing (0, 1)
Processing (0, 2)
Processing (0, 0)
Processing (0, 3)
...
Processing (0, 998)
Processing (0, 997)
Processing (0, 999)
Processing (1, 0)
Processing (1, 1)
...
Processing (999, 999)
Processing (999, 998)

Upvotes: 1

Related Questions