Daniel Park
Daniel Park

Reputation: 4021

Running Task<T> on a custom scheduler

I am creating a generic helper class that will help prioritise requests made to an API whilst restricting parallelisation at which they occur.

Consider the key method of the application below;

    public IQueuedTaskHandle<TResponse> InvokeRequest<TResponse>(Func<TClient, Task<TResponse>> invocation, QueuedClientPriority priority, CancellationToken ct) where TResponse : IServiceResponse
    {
        var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
        _logger.Debug("Queueing task.");
        var taskToQueue = Task.Factory.StartNew(async () =>
        {
            _logger.Debug("Starting  request {0}", Task.CurrentId);
            return await invocation(_client);
        }, cts.Token, TaskCreationOptions.None, _schedulers[priority]).Unwrap();
        taskToQueue.ContinueWith(task => _logger.Debug("Finished task {0}", task.Id), cts.Token);
        return new EcosystemQueuedTaskHandle<TResponse>(cts, priority, taskToQueue);
    }

Without going into too many details, I want to invoke tasks returned by Task<TResponse>> invocation when their turn in the queue arises. I am using a collection of queues constructed using QueuedTaskScheduler indexed by a unique enumeration;

        _queuedTaskScheduler = new QueuedTaskScheduler(TaskScheduler.Default, 3);
        _schedulers = new Dictionary<QueuedClientPriority, TaskScheduler>();
        //Enumerate the priorities
        foreach (var priority in Enum.GetValues(typeof(QueuedClientPriority)))
        {
            _schedulers.Add((QueuedClientPriority)priority, _queuedTaskScheduler.ActivateNewQueue((int)priority));
        }

However, with little success I can't get the tasks to execute in a limited parallelised environment, leading to 100 API requests being constructed, fired, and completed in one big batch. I can tell this using a Fiddler session;

enter image description here

I have read some interesting articles and SO posts (here, here and here) that I thought would detail how to go about this, but so far I have not been able to figure it out. From what I understand, the async nature of the lambda is working in a continuation structure as designed, which is marking the generated task as complete, basically "insta-completing" it. This means that whilst the queues are working fine, runing a generated Task<T> on a custom scheduler is turning out to be the problem.

Upvotes: 2

Views: 805

Answers (2)

svick
svick

Reputation: 244817

Stephen Cleary's answer explains well why you can't use TaskScheduler for this purpose and how you can use ActionBlock to limit the degree of parallelism. But if you want to add priorities to that, I think you'll have to do that manually. Your approach of using a Dictionary of queues is reasonable, a simple implementation (with no support for cancellation or completion) of that could look something like this:

class Scheduler
{
    private static readonly Priority[] Priorities =
        (Priority[])Enum.GetValues(typeof(Priority));

    private readonly IReadOnlyDictionary<Priority, ConcurrentQueue<Func<Task>>> queues;
    private readonly ActionBlock<Func<Task>> executor;
    private readonly SemaphoreSlim semaphore;

    public Scheduler(int degreeOfParallelism)
    {
        queues = Priorities.ToDictionary(
            priority => priority, _ => new ConcurrentQueue<Func<Task>>());

        executor = new ActionBlock<Func<Task>>(
            invocation => invocation(),
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = degreeOfParallelism,
                BoundedCapacity = degreeOfParallelism
            });

        semaphore = new SemaphoreSlim(0);

        Task.Run(Watch);
    }

    private async Task Watch()
    {
        while (true)
        {
            await semaphore.WaitAsync();

            // find item with highest priority and send it for execution
            foreach (var priority in Priorities.Reverse())
            {
                Func<Task> invocation;
                if (queues[priority].TryDequeue(out invocation))
                {
                    await executor.SendAsync(invocation);
                }
            }
        }
    }

    public void Invoke(Func<Task> invocation, Priority priority)
    {
        queues[priority].Enqueue(invocation);
        semaphore.Release(1);
    }
}

Upvotes: 1

Stephen Cleary
Stephen Cleary

Reputation: 456537

This means that whilst the queues are working fine, runing a generated Task on a custom scheduler is turning out to be the problem.

Correct. One way to think about it[1] is that an async method is split into several tasks - it's broken up at each await point. Each one of these "sub-tasks" are then run on the task scheduler. So, the async method will run entirely on the task scheduler (assuming you don't use ConfigureAwait(false)), but at each await it will leave the task scheduler, and then re-enter that task scheduler after the await completes.

So, if you want to coordinate asynchronous work at a higher level, you need to take a different approach. It's possible to write the code yourself for this, but it can get messy. I recommend you first try ActionBlock<T> from the TPL Dataflow library, passing your custom task scheduler to its ExecutionDataflowBlockOptions.

[1] This is a simplification. The state machine will avoid creating actual task objects unless necessary (in this case, they are necessary because they're being scheduled to a task scheduler). Also, only await points where the awaitable isn't complete actually cause a "method split".

Upvotes: 3

Related Questions