Timeout for Action in Parallel.ForEach iteration

I have something similar to this in my code:

Parallel.ForEach(myList, new ParallelOptions { MaxDegreeOfParallelism = 4 }, item =>
{
    Process(item);
});

The thing is that I do a bunch of things inside Process() method (connect to a file share, parse a file, save to db, etc) and I worry that something might go wrong during this process making the iteration never finish...could this ever happen?

Is there a way to set a timeout for the Process() method to avoid ending up having zombie threads?

UPDATE:

The easiest way I've found for setting a timeout is by adding milliseconds to a CancellationTokenSource or calling the Wait() method on a task.

Option #1

Parallel.ForEach(myList, new ParallelOptions { MaxDegreeOfParallelism = 4 }, item =>
{
    var cts = new CancellationTokenSource(2000);
    Task task = Task.Factory.StartNew(() => Process(item), cts.Token);
});

Option #2

Parallel.ForEach(myList, new ParallelOptions { MaxDegreeOfParallelism = 4 }, item =>
{
    Task task = Task.Factory.StartNew(() => Process(item));
    task.Wait(2000);
});

The problem is that none of those options are able to cancel the Process() method. Do I need to check for something in the Process() method?

Upvotes: 12

Views: 16484

Answers (3)

perNalin
perNalin

Reputation: 161

I ended up with a slightly different implementation. In my case, checking for the CancellationToken within Process() wouldn't work due to potentially long running statements in between checks. For example if my timeout was 5 seconds, and a single statement took say 100 seconds ... I wouldn't know until that statement completed and then was detected by if (token.IsCancellationRequested).

Here's what I ended up doing

     Parallel.ForEach(myList, (item) =>
     {
         Task task = Task.Factory.StartNew(() =>
         {
             Process(item));
         });

         if(task.Wait(10000)) // Specify overall timeout for Process() here
             Console.WriteLine("Didn't Time out. All's good!"); 
         else
             Console.WriteLine("Timed out. Leave this one and continue with the rest"); 
     });

Then within the Process() method, I added further checks on potentially long running statements to allow it to gracefully handle timeouts (as much as possible). So it's only in the worst case that Process() had to be prematurely stopped by Task.Wait() above.

    private void Process(MyItem item)
    {
      ...
        cmd.CommandTimeout = 5; // The total of timeouts within Process() were 
                                // set to be less than the total Task.Wait duration.
                                // Unfortunately some potentially long running methods 
                                // don't have a built in timeout.
      ...
    }

Upvotes: 1

I ended up combining both options. It works but I don't know if this is the proper way to do this.

Solution:

        Parallel.ForEach(myList, new ParallelOptions { MaxDegreeOfParallelism = 4 }, item =>
        {
                var tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
                var token = tokenSource.Token;

                Task task = Task.Factory.StartNew(() => Process(item, token), token);
                task.Wait();
        });

and in Process() I check for cancellation multiple times:

    private void Process(MyItem item, CancellationToken token)
    {
        try
        {
            if (token.IsCancellationRequested)
                token.ThrowIfCancellationRequested();

            ...sentences

            if (token.IsCancellationRequested)
                token.ThrowIfCancellationRequested();

            ...more sentences

            if (token.IsCancellationRequested)
                token.ThrowIfCancellationRequested();

            ...etc
        }
        catch(Exception ex)
            Console.WriteLine("Operation cancelled");

Upvotes: 3

poy
poy

Reputation: 10517

Consider adding CancellationToken to your code. This way, at any point you can properly cancel all the operations.

Then, you can use the CancelAfter() method.

Upvotes: 2

Related Questions