Reputation: 61
I have a program which needs to run a number of async tasks in parallel. The main difference between two implements, Task1()
and Task2()
:
Task1()
saves the FTask
of type Func<Task>
and later await Task.WhenAny(FTask())
,
Task2()
saves the Task
produced by Task.Run()
and later await Task.WhenAny(Task)
.
I think they should be equivalent, but Task1()
does not work properly, however Task2()
does work.
The flow of this program is: If there are available task contexts in the free queue, dequeue one and setup it up with an async task; when async task finished put the context back to the free queue.
The problem is Task1()
which saves Func<Task>
does not work, it seems the async task has more than one instances running.
The complete code is:
static async Task Task1()
{
int qd = 4;
var taskPool = Enumerable.Range(0, qd).Select(n => new TaskCtx()).ToArray();
Queue<TaskCtx> qFree = new Queue<TaskCtx>(taskPool);
Random r = new Random();
long segs = 7, submit = 0;
int token = 0;
int ticket = 0;
Console.WriteLine($"-------- Task1 start --------");
while (submit < segs)
{
if (qFree.Count > 0)
{
var x = qFree.Dequeue();
x.Token = token++;
x.Delay1 = r.Next(10, 20);
x.Delay2 = r.Next(30, 50);
x.FTask = async () =>
{
Console.WriteLine($"[{x.Token}] start");
await Task.Delay(x.Delay1);
Console.WriteLine($"[{x.Token}] wait ticket: {ticket}");
while (ticket != x.Token)
{
await Task.Yield(); // yield and wait other task increases the ticket
}
Console.WriteLine($"[{x.Token}] acquire ticket: {ticket}");
await Task.Delay(x.Delay2);
Console.WriteLine($"[{x.Token}] release ticket: {ticket}");
ticket++;
qFree.Enqueue(x);
};
Console.WriteLine($"[{x.Token}] submit");
submit++;
}
else await Task.WhenAny(taskPool.Select(x => x.FTask()));
}
await Task.WhenAll(taskPool.Select(x => x.FTask()));
Console.WriteLine($"-------- Task1 finished --------");
}
static async Task Task2()
{
int qd = 4;
var taskPool = Enumerable.Range(0, qd).Select(n => new TaskCtx()).ToArray();
Queue<TaskCtx> qFree = new Queue<TaskCtx>(taskPool);
Random r = new Random();
long segs = 7, submit = 0;
int token = 0;
int ticket = 0;
Console.WriteLine($"-------- Task2 start --------");
while (submit < segs)
{
if (qFree.Count > 0)
{
var x = qFree.Dequeue();
x.Token = token++;
x.Delay1 = r.Next(10, 20);
x.Delay2 = r.Next(30, 50);
x.Task = Task.Run(async () =>
{
Console.WriteLine($"[{x.Token}] start");
await Task.Delay(x.Delay1);
Console.WriteLine($"[{x.Token}] wait ticket: {ticket}");
while (ticket != x.Token)
{
await Task.Yield(); // yield and wait other task increases the ticket
}
Console.WriteLine($"[{x.Token}] acquire ticket: {ticket}");
await Task.Delay(x.Delay2);
Console.WriteLine($"[{x.Token}] release ticket: {ticket}");
ticket++;
qFree.Enqueue(x);
});
Console.WriteLine($"[{x.Token}] submit");
submit++;
}
else await Task.WhenAny(taskPool.Select(x => x.Task));
}
await Task.WhenAll(taskPool.Select(x => x.Task));
Console.WriteLine($"-------- Task2 finished --------");
}
public class TaskCtx
{
public int Token;
public int Delay1;
public int Delay2;
public Func<Task> FTask;
public Task Task;
public TaskCtx()
{
FTask = async () => await Task.CompletedTask;
Task = Task.CompletedTask;
}
}
The output of Task1()
:
-------- Task1 start --------
[0] submit
[1] submit
[2] submit
[3] submit
[0] start
[1] start
[2] start
[3] start
[0] wait ticket: 0
[0] acquire ticket: 0
[2] wait ticket: 0
[1] wait ticket: 0
[3] wait ticket: 0
[0] release ticket: 0
[1] acquire ticket: 1 <-- [1] is running and got the ticket
[4] submit
[4] start
[1] start <-- another [1] starts ???
[2] start
[3] start
[2] wait ticket: 1
[1] wait ticket: 1
[1] acquire ticket: 1
[4] wait ticket: 1
[3] wait ticket: 1
[1] release ticket: 1
[2] acquire ticket: 2
[2] acquire ticket: 2
[1] release ticket: 2
[3] acquire ticket: 3
[3] acquire ticket: 3
[5] submit
[6] submit
[4] start
[6] start
[2] start
[3] start
[2] wait ticket: 3
[4] wait ticket: 3
[6] wait ticket: 3
[3] wait ticket: 3
[3] acquire ticket: 3
[2] release ticket: 3
[4] acquire ticket: 4
[2] release ticket: 3
[4] acquire ticket: 4
[3] release ticket: 5
[6] acquire ticket: 6
[3] release ticket: 5
[3] release ticket: 7
[4] release ticket: 8
[4] release ticket: 8
[6] release ticket: 10
The output of Task2()
:
-------- Task2 start --------
[0] submit
[1] submit
[2] submit
[3] submit
[0] start
[1] start
[2] start
[3] start
[2] wait ticket: 0
[1] wait ticket: 0
[3] wait ticket: 0
[0] wait ticket: 0
[0] acquire ticket: 0
[0] release ticket: 0
[1] acquire ticket: 1
[4] submit
[4] start
[4] wait ticket: 1
[1] release ticket: 1
[2] acquire ticket: 2
[5] submit
[5] start
[5] wait ticket: 2
[2] release ticket: 2
[3] acquire ticket: 3
[6] start
[6] submit
[6] wait ticket: 3
[3] release ticket: 3
[4] acquire ticket: 4
[4] release ticket: 4
[5] acquire ticket: 5
[5] release ticket: 5
[6] acquire ticket: 6
[6] release ticket: 6
-------- Task2 finished --------
UPDATE: I make another implement, by use a normal async function, not anonymous function:
static async Task Task3()
{
int qd = 4;
var taskPool = Enumerable.Range(0, qd).Select(n => new TaskCtx()).ToArray();
Queue<TaskCtx> qFree = new Queue<TaskCtx>(taskPool);
Random r = new Random();
long segs = 7, submit = 0;
int token = 0;
Ticket ticket = new Ticket(0);
Console.WriteLine($"-------- Task3 start --------");
while (submit < segs)
{
if (qFree.Count > 0)
{
var x = qFree.Dequeue();
x.Token = token++;
x.Delay1 = r.Next(10, 20);
x.Delay2 = r.Next(30, 50);
x.Task = RunTask3Async(x, ticket, qFree);
Console.WriteLine($"[{x.Token}] submit");
submit++;
}
else await Task.WhenAny(taskPool.Select(x => x.Task));
}
await Task.WhenAll(taskPool.Select(x => x.Task));
Console.WriteLine($"-------- Task3 finished --------");
}
static async Task RunTask3Async(TaskCtx x, Ticket ticket, Queue<TaskCtx> qFree)
{
Console.WriteLine($"[{x.Token}] start");
await Task.Delay(x.Delay1);
Console.WriteLine($"[{x.Token}] wait ticket: {ticket}");
while (ticket.ticket != x.Token)
{
await Task.Yield(); // yield and wait other task increases the ticket
}
Console.WriteLine($"[{x.Token}] acquire ticket: {ticket}");
await Task.Delay(x.Delay2);
Console.WriteLine($"[{x.Token}] release ticket: {ticket}");
ticket.ticket++;
qFree.Enqueue(x);
}
public class Ticket
{
public int ticket;
public Ticket(int t)
{
ticket = t;
}
public override string ToString()
{
return $"{ticket}";
}
}
Task3()
also works, the output is:
-------- Task3 start --------
[0] start
[0] submit
[1] start
[1] submit
[2] start
[2] submit
[3] start
[3] submit
[0] wait ticket: 0
[3] wait ticket: 0
[0] acquire ticket: 0
[2] wait ticket: 0
[1] wait ticket: 0
[0] release ticket: 0
[1] acquire ticket: 1
[4] start
[4] submit
[4] wait ticket: 1
[1] release ticket: 1
[2] acquire ticket: 2
[5] start
[5] submit
[5] wait ticket: 2
[2] release ticket: 2
[3] acquire ticket: 3
[6] start
[6] submit
[6] wait ticket: 3
[3] release ticket: 3
[4] acquire ticket: 4
[4] release ticket: 4
[5] acquire ticket: 5
[5] release ticket: 5
[6] acquire ticket: 6
[6] release ticket: 6
-------- Task3 finished --------
UPDATE2: I tried to minimal the example but the following code does not have this problem.
var FTasks = Enumerable.Range(0, 8).Select(i => async () =>
{
Console.WriteLine($"[{i}] start");
await Task.Delay(i);
Console.WriteLine($"[{i}] end");
}).ToArray();
await Task.WhenAll(FTasks.Select(x => x()));
Console.WriteLine("----------------");
var Tasks = Enumerable.Range(0, 8).Select(i => Task.Run(async () =>
{
Console.WriteLine($"[{i}] start");
await Task.Delay(i);
Console.WriteLine($"[{i}] end");
})).ToArray();
await Task.WhenAll(Tasks);
Upvotes: 1
Views: 1357
Reputation: 61
This problem is just like Stephen says in his answer, every time you await a async delegate, a new instance of async task is created and starts from beginning.
This a minimal example to illustrate this problem:
int i = 0;
var ftask = async () =>
{
Console.WriteLine($"[{i++}] Func<Task>");
await Task.Yield();
};
await ftask();
await ftask();
//--------------------
int j = 0;
var task = Task.Run(async () =>
{
Console.WriteLine($"[{j++}] Task.Run()");
await Task.Yield();
});
await task;
await task;
The output:
[0] Func<Task>
[1] Func<Task>
[0] Task.Run()
Upvotes: 1
Reputation: 456517
As others have noted, the main difference between Task
and Func<Task>
is that the first one represents an operation (that has already started), and the second one represents a delegate that starts an operation. This is the same logic as for any T
and Func<T>
.
So, the question of "why are these run more than once?" is answered by looking in your Task1
:
else await Task.WhenAny(taskPool.Select(x => x.FTask()));
}
await Task.WhenAll(taskPool.Select(x => x.FTask()));
Every time your code calls FTask()
, it's starting a new asynchronous operation. So each call to Task.WhenAny
will start all the tasks in the taskPool
, and the final call to Task.WhenAll
will start all the tasks in taskPool
again.
Upvotes: 4
Reputation: 706
The main difference between storing Task.Run(...)
in a Task and assigning it to a Func is in the timing of excecution. Task.Run()
starts the work directly whereas the function is lazy, i.e. the work starts when the function is called.
Upvotes: 2
Reputation: 36371
This question essentially boils down to the difference between calling
public Task MyMethod();
and calling
public void MyMethod()
using Task.Run
. The essential difference is that the second example will explicitly be run on a background thread. The first example will run on the current thread up until the first await. At this point it may continue, or may schedule a 'continuation' at a later time. This continuation will be run on the same synchronization context as the calling thread, this means that in UI programs it will run on the UI thread if called from a UI thread.
For a console program like yours, it will be little practical difference since one of the first operations will be a Task.Delay
, and since you do not have a UI, it will run the continuation on a thread pool thread.
Upvotes: 1