Reputation: 777
Is there any way to stop execution of other concurrent threads if one of the threads fail? I am working on a ASP.NET MVC website which will make heavy use of API calls. Instead of sending too many API requests in sequential fashion, I am creating threads, each thread consuming one API call. If any of the thread fails, I want to kill other threads. How can I do that? I used Task.WaitAll
but it allows all other threads to execute even if one thread fails. Please help.
UPDATE: To simulate real time scenario, I have created a console app. Please refer below code. I want to make fifth thread fail. After fifth thread fails, I want all other running threads to stop.
public class Program
{
static int[] delays = { 3, 2, 10, 4, 5, 6, 7, 8, 9, 1 };
static int[] ids = new int[10];
static Task[] tasks = null;
static CancellationTokenSource tokenSource = new CancellationTokenSource();
static IList<int> threadsExecuted = new List<int>();
static void Main(string[] args)
{
ids = Enumerable.Range(1, 10).ToArray();
try
{
tasks = ids.Select(id => MyTaks(id)).ToArray();
Task.WaitAll(tasks);
}
catch (Exception exception)
{
Console.WriteLine("Exception in Main::\nMessage: " + exception.Message +
"StackTrace: " + exception.StackTrace +
"InnerException: " + exception.InnerException);
}
Console.WriteLine("\n\nThreads executed: " + string.Join(", ", threadsExecuted.OrderBy(id => id).ToArray()));
Console.WriteLine("\n\n\nExit..");
Console.Read();
}
private static async Task MyTaks(int id)
{
var delay = delays[id - 1] * 1000;
Console.WriteLine("Thread id #" + id + " started with delay " + delay + " seconds.");
CancellationToken cToken = tokenSource.Token;
Task task = new Task(() =>
{
Thread.Sleep(delay);
if (id == 5) //Fail
{
Console.WriteLine("Cancelling..");
throw new Exception("Thread id #" + id + " failed.");
}
}, cToken);
task.Start();
await task;
Console.WriteLine("Thread id #" + id + " executed.");
threadsExecuted.Add(id);
}
}
Upvotes: 1
Views: 2292
Reputation: 456917
Instead of sending too many API requests in sequential fashion, I am creating threads, each thread consuming one API call.
This approach will pretty much destroy any scalability for your server. Instead of parallel concurrency (multiple threads), you should use asynchronous concurrency (multiple operations). Your API code would use HttpClient.GetAsync
and pass a CancellationToken
.
So, your test code would look something like this:
static void Main(string[] args)
{
ids = Enumerable.Range(1, 10).ToArray();
try
{
tasks = ids.Select(id => MyTaks(id)).ToArray();
Task.WaitAll(tasks);
}
catch (Exception exception)
{
Console.WriteLine("Exception in Main::\nMessage: " + exception.Message +
"StackTrace: " + exception.StackTrace +
"InnerException: " + exception.InnerException);
}
lock (threadsExecuted)
Console.WriteLine("\n\nThreads executed: " + string.Join(", ", threadsExecuted.OrderBy(id => id).ToArray()));
Console.WriteLine("\n\n\nExit..");
Console.Read();
}
private static async Task MyTaks(int id)
{
var delay = delays[id - 1] * 1000;
Console.WriteLine("Task id #" + id + " started with delay " + delay + " seconds.");
CancellationToken cToken = tokenSource.Token;
// Production code would use `await httpClient.GetAsync(url, token)` here.
await Task.Delay(delay, cToken);
if (id == 5) //Fail
{
Console.WriteLine("Cancelling..");
tokenSource.Cancel();
throw new Exception("Thread id #" + id + " failed.");
}
Console.WriteLine("Thread id #" + id + " executed.");
lock (threadsExecuted)
threadsExecuted.Add(id);
}
Upvotes: 2
Reputation:
You need to use the version of Task.WaitAll() which takes a CancellationToken.
Further information - Cancellation in Managed Threads
Upvotes: 0
Reputation: 8273
You could try something like this if you want to cancel a collection of task
// Define the cancellation token.
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken token = source.Token;
Random rnd = new Random();
Object lockObj = new Object();
List<Task<int[]>> tasks = new List<Task<int[]>>();
TaskFactory factory = new TaskFactory(token);
for (int taskCtr = 0; taskCtr <= 10; taskCtr++) {
int iteration = taskCtr + 1;
tasks.Add(factory.StartNew( () => {
int value;
int[] values = new int[10];
for (int ctr = 1; ctr <= 10; ctr++) {
token.ThrowIfCancellationRequested();
lock (lockObj) {
value = rnd.Next(0,101);
}
if (value == 0) {
source.Cancel();
Console.WriteLine("Cancelling at task {0}", iteration);
break;
}
values[ctr-1] = value;
}
return values;
}, token));
}
try {
Task<double> fTask = factory.ContinueWhenAll(tasks.ToArray(),
(results) => {
Console.WriteLine("Calculating overall mean...");
long sum = 0;
int n = 0;
foreach (var t in results) {
foreach (var r in t.Result) {
sum += r;
n++;
}
}
return sum/(double) n;
} , token);
Console.WriteLine("The mean is {0}.", fTask.Result);
}
catch (AggregateException ae) {
foreach (Exception e in ae.InnerExceptions) {
if (e is TaskCanceledException)
Console.WriteLine("Unable to compute mean: {0}",
((TaskCanceledException) e).Message);
else
Console.WriteLine("Exception: " + e.GetType().Name);
}
}
finally {
source.Dispose();
}
Update: To cancel web api using cancellation token You need to accept cancellation token in API
[AsyncTimeout(150)]
[HandleError(ExceptionType = typeof(TimeoutException),
View = "TimeoutError")]
public async Task<ActionResult> GizmosCancelAsync(
CancellationToken cancellationToken )
{
ViewBag.SyncOrAsync = "Asynchronous";
var gizmoService = new GizmoService();
return View("Gizmos",
await gizmoService.GetGizmosAsync(cancellationToken));
}
Asynchronous action methods returning Task<ActionResult>
are cancelable, that is they take a CancellationToken
parameter when one is provided with the AsyncTimeout attribute
.
Upvotes: 1