ebvtrnog
ebvtrnog

Reputation: 4367

Tricky way of doing foreach in C#

I will first provide the pseudocode and describe it below:

public void RunUntilEmpty(List<Job> jobs)
{
    while (jobs.Any()) // the list "jobs" will be modified during the execution
    {
        List<Job> childJobs = new List<Job>();

        Parallel.ForEach(jobs, job => // this will be done in parallel
        {
            List<Job> newJobs = job.Do(); // after a job is done, it may return new jobs to do
            lock (childJobs)
                childJobs.AddRange(newJobs); // I would like to add those jobs to the "pool"
        });

        jobs = childJobs;
    }
}

As you can see, I am performing a unique type of foreach. The source, the set (jobs), can simply be enhanced during the execution and this behaviour cannot be determined earlier. When the method Do() is called on an object (here, job), it may return new jobs to perform and thus would enhance the source (jobs).

I could call this method (RunUntilEmpty) recursively, but unfortunately the stack can be really huge and is likely to result in an overflow.

Could you please tell me how to achieve this? Is there a way of doing this kind of actions in C#?

Upvotes: 1

Views: 152

Answers (1)

Peter Duniho
Peter Duniho

Reputation: 70652

If I understand correctly, you basically start out with some collection of Job objects, each representing some task which can itself create one or more new Job objects as a result of performing its task.

Your updated code example looks like it will basically accomplish this. But note that, as commenter CommuSoft points out, it won't make most efficient use of your CPU cores. Because you are only updating the list of jobs after each group of jobs has completed, there's no way for newly-generated jobs to run until all of the previously-generated jobs have completed.

A better implementation would use a single queue of jobs, continually retrieving new Job objects for execution as old ones complete.

I agree that TPL Dataflow may be a useful way to implement this. However, depending on your needs, you might find it simple enough to just queue the tasks directly to the thread pool and use CountdownEvent to track the progress of the work so that your RunUntilEmpty() method knows when to return.

Without a good, minimal, complete code example, it's impossible to provide an answer that includes a similarly complete code example. But hopefully the below snippet illustrates the basic idea well enough:

public void RunUntilEmpty(List<Job> jobs)
{
    CountdownEvent countdown = new CountdownEvent(1);

    QueueJobs(jobs, countdown);

    countdown.Signal();
    countdown.Wait();
}

private static void QueueJobs(List<Job> jobs, CountdownEvent countdown)
{
    foreach (Job job in jobs)
    {
        countdown.AddCount(1);

        Task.Run(() =>
        {
            // after a job is done, it may return new jobs to do
            QueueJobs(job.Do(), countdown);

            countdown.Signal();
        });
    }
}

The basic idea is to queue a new task for each Job object, incrementing the counter of the CountdownEvent for each task that is queued. The tasks themselves do three things:

  1. Run the Do() method,
  2. Queue any new tasks, using the QueueJobs() method so that the CountdownEvent object's counter is incremented accordingly, and
  3. Signal the CountdownEvent, decrementing its counter for the current task

The main RunUntilEmpty() signals the CountdownEvent to account for the single count it contributed to the object's counter when it created it, and then waits for the counter to reach zero.

Note that the calls to QueueJobs() are not recursive. The QueueJobs() method is not called by itself, but rather by the anonymous method declared within it, which is itself also not called by QueueJobs(). So there is no stack-overflow issue here.

The key feature in the above is that tasks are continuously queued as they become known, i.e. as they are returned by the previously-executed Do() method calls. Thus, the available CPU cores are kept busy by the thread pool, at least to the extent that any completed Do() method has in fact returned any new Job object to run. This addresses the main problem with the version of the code you've included in your question.

Upvotes: 2

Related Questions