Reputation: 54103
I am trying to sequentially queue async tasks so I made a class for this:
public class TaskSequentialQueue : IDisposable, ITaskSequentialQueue
{
public delegate void OnExeptionDelegate(Exception ex);
private readonly Queue<Task> m_queue = new Queue<Task>();
private readonly Object m_lock = new Object();
private readonly CancellationTokenSource m_CancelToken = new CancellationTokenSource();
private readonly OnExeptionDelegate m_onExeptionDelegate = null;
private Task m_currentTask = null;
private bool m_isDisposed = false;
public TaskSequentialQueue(OnExeptionDelegate expDelegate = null)
{
m_onExeptionDelegate = expDelegate;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool isDisposing)
{
if (m_isDisposed)
{
return;
}
if (isDisposing)
{
lock (m_lock)
{
m_isDisposed = true;
m_queue.Clear();
}
m_CancelToken.Cancel();
m_CancelToken.Dispose();
}
}
public void EnqueueTask( Task task)
{
lock (m_lock)
{
if (m_isDisposed)
throw new ObjectDisposedException("TaskSequentialQueue");
m_queue.Enqueue(task);
}
StartNextTask();
}
public void EnqueueTask( Func<Task> task)
{
EnqueueTask(new Task<Task>(task));
}
public Task EnqueueTaskAndWait( Task task)
{
TaskCompletionSource<int> taskSource = new TaskCompletionSource<int>();
lock (m_lock)
{
if (m_isDisposed)
throw new ObjectDisposedException("TaskSequentialQueue");
Func<Task> toDo = async () =>
{
var waitabletask = task.ContinueWith( antecedent =>
{
taskSource.SetResult(0);
if (antecedent.Exception != null)
throw antecedent.Exception;
});
task.Start();
await waitabletask;
};
this.EnqueueTask(toDo);
}
StartNextTask();
return taskSource.Task; //TODO! propagate the exception correctly ?
}
private void StartNextTask()
{
Task theTask = null;
lock(m_lock)
{
if (m_currentTask == null && m_queue.Count > 0 && !m_isDisposed)
{
m_currentTask = m_queue.Dequeue();
theTask = m_currentTask;
}
}
if (theTask != null)
{
theTask.Start();
theTask.ContinueWith( (antecedent) =>
{
Exception theEx = antecedent.Exception;
if (theEx == null && antecedent is Task<Task>)
theEx = (antecedent as Task<Task>)?.Result.Exception;
if (m_onExeptionDelegate != null && theEx != null)
{
try { m_onExeptionDelegate(theEx); } catch(Exception) {}
}
lock(m_lock)
{
m_currentTask = null;
}
Task.Run( () => StartNextTask() );
}
}
}
}
I use it like this:
Func<Task> action = async () =>
{
Log("Entered");
await Task.Delay(5000);
Log("Exited");
}
m_taskSequentialQueue.EnqueueTask( action );
m_taskSequentialQueue.EnqueueTask( action );
I expect my log to read:
Entered
Exited
Entered
Exited
Instead I get:
Entered
Entered
Exited
Exited
I'm not sure what I'm doing wrong.
Thanks
Upvotes: 1
Views: 119
Reputation: 11910
Func<Task> action = async () =>
{
lock (lock_x)
{
Console.WriteLine("Entered");
Thread.Sleep(5000);
Console.WriteLine("Exited");
}
};
should work since your queue implementation is submitting them in order.
Upvotes: 0
Reputation: 127543
When you do theTask.ContinueWith(
in StartNextTask
the thing you are continuing on is the starting of the inner task not the completion of the inner task. Once the inner task hits the first await
the theTask
task will be considered complete because the function returned.
As a band-aid you could do
if (theTask != null)
{
theTask.Start();
if(theTask is Task<Task>)
{
theTask = ((Task<Task>)theTask).Unwrap();
}
theTask.ContinueWith(...
However I think your whole method of using "Cold Tasks" is flawed. You should just work with a Queue<Func<Task>>
instead of a Queue<Task>
and it will allow your code to be much simpler.
Upvotes: 4