Reputation: 4063
Background:
I've got a console app which creates Tasks
to process data from a DB (let's call them Level1 tasks). Each one of the Tasks creates its own Tasks again to process each part of the data that was assign to it (Level2 tasks).
Each of the Level2 tasks has a continuation task associated with it and the code used to do a WaitAll
on the continuation tasks before moving on.
I'm on .NET 4.0
(no async
/await
)
Issue:
This created a problem though - it turned out that if it is done this way, none of the Level2 tasks was started before all available Level1 tasks were scheduled. Which is not optimal in any way.
Question:
This seems to be fixed with changing the code to wait for both the original Level2 task AND its continuation task. However, I'm not entirely sure why this is the case.
Do you have any ideas?
The only thing that I could come up with is that - since the continuation task has not started, there is no point in waiting for it to complete. But even if that was the case I would expect at least SOME of the Level2 tasks to have started. Which they never did.
Example:
I created a sample console app that demonstrated exactly that behavior:
Run it as is and you will see that it's scheduling all the tasks first, and only then you start getting the actual lines written from within the Level2 tasks.
But comment out the marked block of code and uncomment the replacement and all works as expected.
Can you tell me why?
public class Program
{
static void Main(string[] args)
{
for (var i = 0; i < 100; i++)
{
Task.Factory.StartNew(() => SomeMethod());
//Thread.Sleep(1000);
}
Console.ReadLine();
}
private static void SomeMethod()
{
var numbers = new List<int>();
for (var i = 0; i < 10; i++)
{
numbers.Add(i);
}
var tasks = new List<Task>();
foreach (var number in numbers)
{
Console.WriteLine("Before start task");
var numberSafe = number;
/* Code to be replaced START */
var nextTask = Task.Factory.StartNew(() =>
{
Console.WriteLine("Got number: {0}", numberSafe);
})
.ContinueWith(task =>
{
Console.WriteLine("Continuation {0}", task.Id);
});
tasks.Add(nextTask);
/* Code to be replaced END */
/* Replacement START */
//var originalTask = Task.Factory.StartNew(() =>
//{
// Console.WriteLine("Got number: {0}", numberSafe);
//});
//var contTask = originalTask
// .ContinueWith(task =>
// {
// Console.WriteLine("Continuation {0}", task.Id);
// });
//tasks.Add(originalTask);
//tasks.Add(contTask);
/* Replacement END */
}
Task.WaitAll(tasks.ToArray());
}
}
Upvotes: 5
Views: 2725
Reputation: 61666
The problem with your code is the blocking Task.WaitAll(tasks.ToArray())
. The default TPL task scheduler will not use a new pool thread for each task you start with Factory.StartNew
. And you start 100 Level1 tasks, each blocking a thread with Task.WaitAll
.
This creates a bottleneck. With the default size of ThreadPool
, I'm getting ~20 threads running concurrently, with only 4 of them actually executing simultaneously (the number of the CPU cores).
Thus, some tasks will only be queued and will be started later, as the earlier tasks are getting completed. To see what I mean, try changing your code like this:
static void Main(string[] args)
{
for (var i = 0; i < 100; i++)
{
Task.Factory.StartNew(() => SomeMethod(),
TaskCreationOptions.LongRunning);
}
Console.ReadLine();
}
TaskCreationOptions.LongRunning
will give you the desired behavior, but this of course would be a wrong solution.
The right solution is to avoid a blocking code where possible. You should be doing the blocking wait at the topmost level only, if you have to do it all.
To address this, your code can be re-factored like below. Note the use of ContinueWhenAll
, Unwrap
and (optionally) ExecuteSynchronously
, that helps to eliminate the blocking code and reduce the number of pool threads involved. This version performs much better.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
public class Program
{
static void Main(string[] args)
{
var tasks = new List<Task>();
for (var i = 0; i < 100; i++)
{
tasks.Add(Task.Factory.StartNew(() => SomeMethod(i)).Unwrap());
}
// blocking at the topmost level
Task.WaitAll(tasks.ToArray());
Console.WriteLine("Enter to exit...");
Console.ReadLine();
}
private static Task<Task[]> SomeMethod(int n)
{
Console.WriteLine("SomeMethod " + n);
var numbers = new List<int>();
for (var i = 0; i < 10; i++)
{
numbers.Add(i);
}
var tasks = new List<Task>();
foreach (var number in numbers)
{
Console.WriteLine("Before start task " + number);
var numberSafe = number;
var nextTask = Task.Factory.StartNew(() =>
{
Console.WriteLine("Got number: {0}", numberSafe);
})
.ContinueWith(task =>
{
Console.WriteLine("Continuation {0}", task.Id);
}, TaskContinuationOptions.ExecuteSynchronously);
tasks.Add(nextTask);
}
return Task.Factory.ContinueWhenAll(tasks.ToArray(),
result => result, TaskContinuationOptions.ExecuteSynchronously);
}
}
Ideally, in a real-life project you should stick with naturally asynchronous APIs wherever possible (e.g, "Using SqlDataReader’s new async methods in .Net 4.5"), and use Task.Run
/ Task.Factory.StartNew
only for CPU-bound computational tasks. And for the server-side applications (e.g., ASP.NET Web API), Task.Run
/ Task.Factory.StartNew
will usually only add an overhead of redundant thread switching. It won't speed up the completion of an HTTP request, unless you really need to do multiple CPU-bound jobs in parallel, hurting the scalability.
I understand the following might not be a feasible option, but I'd highly recommend upgrading to VS2012+ and using async/await
to implement a logic like this. It'd be very well worth the investment, as it greatly speeds up the coding process and produces simpler, cleaner and less error-prone code. You still would be able to target .NET 4.0 with Microsoft.Bcl.Async
.
Upvotes: 2
Reputation: 83
If I remember correctly, waiting on a task that has not yet been scheduled may execute it synchronously. (see here) It wouldn't be very surprising that this behavior would apply to your code in the alternative case.
Keeping in mind that the threading behaviour is highly implementation- and machine-dependant, what happens here is probably something in the lines of this:
What changes when you use your alternative method, is that because you reference the "Level 2" task directly in the array of tasks to wait for, the Task.WaitAll method gets the chance to execute the "Level 2" tasks synchronously instead of idling away. This could not happen in the initial case, because continuation tasks cannot be run synchronously.
In conclusion, waiting in ThreadPool threads was what lead you to thread starvation and to the strange behavior you observed. While the optimization in the code waiting for tasks made the thread starvation behavior fade away, it is clearly not something you should rely on.
To solve your initial problem, you would be better following the suggestion made by lil-raz of doing away with your inner tasks.
If you have access to C# 5.0, you could also consider using the async/await pattern to write your code without relying on waiting.
Upvotes: 1
Reputation: 7602
I think you are seeing the Task Inlining
behavior. Quoting from MSDN:
In some cases, when a Task is waited on, it may be executed synchronously on the Thread that is performing the wait operation. This enhances performance, as it prevents the need for an additional Thread by utilizing the existing Thread which would have blocked, otherwise. To prevent errors due to re-entrancy, task inlining only occurs when the wait target is found in the relevant Thread's local queue.
You dont need 100 tasks to see this. I've modified your program to have 4 Level 1 tasks (I have quad core CPU). Each level 1 task creates only one level 2 task.
static void Main(string[] args)
{
for (var i = 0; i < 4; i++)
{
int j = i;
Task.Factory.StartNew(() => SomeMethod(j)); // j as level number
}
}
In your original program the nextTask
is the continuation task - so I just simplified the method.
private static void SomeMethod(int num)
{
var numbers = new List<int>();
// create only one level 2 task for representation purpose
for (var i = 0; i < 1; i++)
{
numbers.Add(i);
}
var tasks = new List<Task>();
foreach (var number in numbers)
{
Console.WriteLine("Before start task: {0} - thread {1}", num,
Thread.CurrentThread.ManagedThreadId);
var numberSafe = number;
var originalTask = Task.Factory.StartNew(() =>
{
Console.WriteLine("Got number: {0} - thread {1}", num,
Thread.CurrentThread.ManagedThreadId);
});
var contTask = originalTask
.ContinueWith(task =>
{
Console.WriteLine("Continuation {0} - thread {1}", num,
Thread.CurrentThread.ManagedThreadId);
});
tasks.Add(originalTask); // comment and un-comment this line to see change in behavior
tasks.Add(contTask); // same as adding nextTask in your original prog.
}
Task.WaitAll(tasks.ToArray());
}
Here is sample output - on commenting tasks.Add(originalTask);
- which is your first block.
Before start task: 0 - thread 4
Before start task: 2 - thread 3
Before start task: 3 - thread 6
Before start task: 1 - thread 5
Got number: 0 - thread 7
Continuation 0 - thread 7
Got number: 1 - thread 7
Continuation 1 - thread 7
Got number: 3 - thread 7
Continuation 3 - thread 7
Got number: 2 - thread 4
Continuation 2 - thread 4
And some sample output - on keeping tasks.Add(originalTask);
which is your second block
Before start task: 0 - thread 4
Before start task: 1 - thread 6
Before start task: 2 - thread 5
Got number: 0 - thread 4
Before start task: 3 - thread 3
Got number: 3 - thread 3
Got number: 1 - thread 6
Got number: 2 - thread 5
Continuation 0 - thread 7
Continuation 1 - thread 7
Continuation 3 - thread 7
Continuation 2 - thread 4
As you can see in second case when you wait for originalTask
on same thread that started it, the task inlining
will make it run on the same thread - which is why you see the Got Number..
messages earlier.
Upvotes: 4
Reputation: 1696
I have to say that this code is truly not optimistic, as you create 100 tasks and it does not mean that you will have 100 threads, and inside each task you create two new tasks, you are over superscribing the scheduler. if those tasks are related to db reading why not to mark them as long processing and discard the inner tasks?
Upvotes: 0