Reputation: 276
I have something similar to this in my code:
Parallel.ForEach(myList, new ParallelOptions { MaxDegreeOfParallelism = 4 }, item =>
{
Process(item);
});
The thing is that I do a bunch of things inside Process()
method (connect to a file share, parse a file, save to db, etc) and I worry that something might go wrong during this process making the iteration never finish...could this ever happen?
Is there a way to set a timeout for the Process()
method to avoid ending up having zombie threads?
UPDATE:
The easiest way I've found for setting a timeout is by adding milliseconds to a CancellationTokenSource
or calling the Wait()
method on a task.
Option #1
Parallel.ForEach(myList, new ParallelOptions { MaxDegreeOfParallelism = 4 }, item =>
{
var cts = new CancellationTokenSource(2000);
Task task = Task.Factory.StartNew(() => Process(item), cts.Token);
});
Option #2
Parallel.ForEach(myList, new ParallelOptions { MaxDegreeOfParallelism = 4 }, item =>
{
Task task = Task.Factory.StartNew(() => Process(item));
task.Wait(2000);
});
The problem is that none of those options are able to cancel the Process()
method. Do I need to check for something in the Process()
method?
Upvotes: 12
Views: 16484
Reputation: 161
I ended up with a slightly different implementation. In my case, checking for the CancellationToken
within Process()
wouldn't work due to potentially long running statements in between checks. For example if my timeout was 5 seconds, and a single statement took say 100 seconds ... I wouldn't know until that statement completed and then was detected by if (token.IsCancellationRequested)
.
Here's what I ended up doing
Parallel.ForEach(myList, (item) =>
{
Task task = Task.Factory.StartNew(() =>
{
Process(item));
});
if(task.Wait(10000)) // Specify overall timeout for Process() here
Console.WriteLine("Didn't Time out. All's good!");
else
Console.WriteLine("Timed out. Leave this one and continue with the rest");
});
Then within the Process()
method, I added further checks on potentially long running statements to allow it to gracefully handle timeouts (as much as possible). So it's only in the worst case that Process() had to be prematurely stopped by Task.Wait()
above.
private void Process(MyItem item)
{
...
cmd.CommandTimeout = 5; // The total of timeouts within Process() were
// set to be less than the total Task.Wait duration.
// Unfortunately some potentially long running methods
// don't have a built in timeout.
...
}
Upvotes: 1
Reputation: 276
I ended up combining both options. It works but I don't know if this is the proper way to do this.
Solution:
Parallel.ForEach(myList, new ParallelOptions { MaxDegreeOfParallelism = 4 }, item =>
{
var tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var token = tokenSource.Token;
Task task = Task.Factory.StartNew(() => Process(item, token), token);
task.Wait();
});
and in Process()
I check for cancellation multiple times:
private void Process(MyItem item, CancellationToken token)
{
try
{
if (token.IsCancellationRequested)
token.ThrowIfCancellationRequested();
...sentences
if (token.IsCancellationRequested)
token.ThrowIfCancellationRequested();
...more sentences
if (token.IsCancellationRequested)
token.ThrowIfCancellationRequested();
...etc
}
catch(Exception ex)
Console.WriteLine("Operation cancelled");
Upvotes: 3
Reputation: 10517
Consider adding CancellationToken
to your code. This way, at any point you can properly cancel all the operations.
Then, you can use the CancelAfter()
method.
Upvotes: 2