jmasterx
jmasterx

Reputation: 54103

Sequential async task queue does not wait until async task ends

I am trying to sequentially queue async tasks so I made a class for this:

 public class TaskSequentialQueue : IDisposable, ITaskSequentialQueue
    {
        public delegate void OnExeptionDelegate(Exception ex);

        private readonly Queue<Task> m_queue = new Queue<Task>();
        private readonly Object m_lock = new Object();
        private readonly CancellationTokenSource m_CancelToken = new CancellationTokenSource();
        private readonly OnExeptionDelegate m_onExeptionDelegate = null;
        private Task m_currentTask = null;
        private bool m_isDisposed = false;      

        public TaskSequentialQueue(OnExeptionDelegate expDelegate = null)
        {
            m_onExeptionDelegate = expDelegate;
        }

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool isDisposing)
        {
            if (m_isDisposed)
            {
                return;
            }

            if (isDisposing)
            {
                lock (m_lock)
                {
                    m_isDisposed = true;
                    m_queue.Clear();
                }

                m_CancelToken.Cancel();
                m_CancelToken.Dispose();
            }
        }

        public void EnqueueTask( Task task)
        {
            lock (m_lock)
            {
                if (m_isDisposed)
                    throw new ObjectDisposedException("TaskSequentialQueue");

                m_queue.Enqueue(task);
            }

            StartNextTask();
        }

        public void EnqueueTask( Func<Task> task)
        {
            EnqueueTask(new Task<Task>(task));
        }

        public Task EnqueueTaskAndWait( Task task)
        {
            TaskCompletionSource<int> taskSource = new TaskCompletionSource<int>();
            lock (m_lock)
            {
                if (m_isDisposed)
                    throw new ObjectDisposedException("TaskSequentialQueue");

                Func<Task> toDo = async () =>
                {                                        
                    var waitabletask = task.ContinueWith( antecedent => 
                                                            { 
                                                                taskSource.SetResult(0); 
                                                                if (antecedent.Exception != null)
                                                                    throw antecedent.Exception; 
                                                            });
                    task.Start();
                    await waitabletask;
                };
                this.EnqueueTask(toDo);
            }

            StartNextTask();

            return taskSource.Task;  //TODO! propagate the exception correctly ?
        }

        private void StartNextTask()
        {
            Task theTask = null;
            lock(m_lock)
            {
                if (m_currentTask == null && m_queue.Count > 0 && !m_isDisposed)
                {
                    m_currentTask = m_queue.Dequeue();
                    theTask = m_currentTask;
                }
            }      
            if (theTask != null)
            {
                theTask.Start();  
                theTask.ContinueWith( (antecedent) =>
                    {
                        Exception theEx = antecedent.Exception;
                        if (theEx == null && antecedent is Task<Task>)
                            theEx = (antecedent as Task<Task>)?.Result.Exception;

                        if (m_onExeptionDelegate != null && theEx != null)
                        {
                            try { m_onExeptionDelegate(theEx); } catch(Exception) {}
                        }

                        lock(m_lock)
                        {
                            m_currentTask = null;
                        }

                        Task.Run( () =>  StartNextTask() );
                }
            }
        }
    }

I use it like this:

     Func<Task> action = async () =>
                {
                   Log("Entered");
                   await Task.Delay(5000);
                   Log("Exited");
                }

m_taskSequentialQueue.EnqueueTask( action );
m_taskSequentialQueue.EnqueueTask( action );

I expect my log to read:

Entered
Exited
Entered
Exited

Instead I get:

Entered
Entered
Exited
Exited

I'm not sure what I'm doing wrong.

Thanks

Upvotes: 1

Views: 119

Answers (2)

huseyin tugrul buyukisik
huseyin tugrul buyukisik

Reputation: 11910

        Func<Task> action = async () =>
        {
            lock (lock_x)
            {
                Console.WriteLine("Entered");
                Thread.Sleep(5000);
                Console.WriteLine("Exited");
            }
        };

should work since your queue implementation is submitting them in order.

Upvotes: 0

Scott Chamberlain
Scott Chamberlain

Reputation: 127543

When you do theTask.ContinueWith( in StartNextTask the thing you are continuing on is the starting of the inner task not the completion of the inner task. Once the inner task hits the first await the theTask task will be considered complete because the function returned.

As a band-aid you could do

if (theTask != null)
{
    theTask.Start();  
    if(theTask is Task<Task>)
    {
       theTask = ((Task<Task>)theTask).Unwrap();
    }
    theTask.ContinueWith(...

However I think your whole method of using "Cold Tasks" is flawed. You should just work with a Queue<Func<Task>> instead of a Queue<Task> and it will allow your code to be much simpler.

Upvotes: 4

Related Questions