Goran
Goran

Reputation: 6518

Task<T> queueing

I have a following method

public async Task<T> SomeMethod(parameters)
{
    // here we execute some instructions which are not thread safe
}

I need SomeMethod to return a Task, so that other methods can run (await) it asynchronously, and not block the UI thread.

The problem is that SomeMethod can be called in parallel, since the execution is returned to UI thread, and that will raise exception, since some of the calls inside SomeMethod() are not thread safe.

What is the best way to ensure that all calls to SomeMethod are queued (and awaitable), and that this queue will be executed in sequence?

Upvotes: 2

Views: 142

Answers (1)

Zein Makki
Zein Makki

Reputation: 30022

Use AsyncLock to prevent two threads from executing a single block of code :

(A traditional lock will not work, because you can't use an await keyword inside of it)

private AsyncLock myAsyncLock = new AsyncLock();

public async Task<T> SomeMethod(parameters)
{
    using (await myAsyncLock.LockAsync())
    {
       // here we execute some instructions which are not thread safe
    }
}


public class AsyncLock
{
    private readonly AsyncSemaphore m_semaphore;
    private readonly Task<Releaser> m_releaser;

    public AsyncLock()
    {
        m_semaphore = new AsyncSemaphore(1);
        m_releaser = Task.FromResult(new Releaser(this));
    }

    public Task<Releaser> LockAsync()
    {
        var wait = m_semaphore.WaitAsync();
        return wait.IsCompleted ?
            m_releaser :
            wait.ContinueWith((_, state) => new Releaser((AsyncLock)state),
                this, System.Threading.CancellationToken.None,
                TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
    }

    public struct Releaser : IDisposable
    {
        private readonly AsyncLock m_toRelease;

        internal Releaser(AsyncLock toRelease) { m_toRelease = toRelease; }

        public void Dispose()
        {
            if (m_toRelease != null)
                m_toRelease.m_semaphore.Release();
        }
    }
}

// http://blogs.msdn.com/b/pfxteam/archive/2012/02/12/10266983.aspx
public class AsyncSemaphore
{
    private readonly static Task s_completed = Task.FromResult(true);
    private readonly Queue<TaskCompletionSource<bool>> m_waiters = new Queue<TaskCompletionSource<bool>>();
    private int m_currentCount;

    public AsyncSemaphore(int initialCount)
    {
        if (initialCount < 0) throw new ArgumentOutOfRangeException("initialCount");
        m_currentCount = initialCount;
    }

    public Task WaitAsync()
    {
        lock (m_waiters)
        {
            if (m_currentCount > 0)
            {
                --m_currentCount;
                return s_completed;
            }
            else
            {
                var waiter = new TaskCompletionSource<bool>();
                m_waiters.Enqueue(waiter);
                return waiter.Task;
            }
        }
    }

    public void Release()
    {
        TaskCompletionSource<bool> toRelease = null;
        lock (m_waiters)
        {
            if (m_waiters.Count > 0)
                toRelease = m_waiters.Dequeue();
            else
                ++m_currentCount;
        }
        if (toRelease != null)
            toRelease.SetResult(true);
    }
}

Upvotes: 2

Related Questions