morleyc
morleyc

Reputation: 2451

Implementation of multi-threaded consumer by message shard ID

I have a command processor that i am rewriting to be sharded across a number of threads. I am going from a single threaded sequential sender. Where we have the delegate func passed into the method which then decides how to call the delegate:

internal class SingleThreadedSendStrategy
{
    public Task<TResponse> Send<TResponse>(Func<IRequest<TResponse>, CancellationToken, Task<TResponse>> func, IRequest<TResponse> request, CancellationToken cancellationToken)
    {
        return func(request, cancellationToken);
    }
}

To a multi-threaded (n threads) setup where each thread has a queue, and commands are appended to a specific queue depending on the message hashcode as below:

internal class ShardedSendStrategy
{
    const int QueueCount = 8;
    readonly BlockingCollection<Task>[] Queues = new BlockingCollection<Task>[QueueCount];
    readonly CancellationToken CancellationToken;

    public ShardedSendStrategy(CancellationToken cancellationToken)
    {
        CancellationToken = cancellationToken;

        for (int i = 0; i < QueueCount; i++)
        {
            Queues[i] = new BlockingCollection<Task>();
            var thread = new Thread( () => OnHandlerStart(Queues[i])) { IsBackground = true };
            thread.Start();
        }
    }

    public Task<TResponse> Send<TResponse>(Func<IRequest<TResponse>, CancellationToken, Task<TResponse>> func, IRequest<TResponse> request, CancellationToken cancellationToken = default)
    {
        var shard = request.GetHashCode() % QueueCount;

        var task = new Task(() => func(request, cancellationToken));
        Queues[shard].Add(task);

        return task; // we have a problem here, Task<TResponse> is expected
    }

    private void OnHandlerStart(BlockingCollection<Task> queue)
    {
        foreach (var job in queue.GetConsumingEnumerable(CancellationToken))
        {
            job.Start();
        }
    }
}

However, if in the Send function i sent the task to var task = new Task(() => func(request, cancellationToken)); I cannot return the task as the compiler throws Cannot implicitly convert type 'System.Threading.Tasks.Task' to 'System.Threading.Tasks.Task<TResponse>'..

If in the Send function i set var task = new Task<TResponse>( () => { return func(request, cancellationToken); }); it throws Cannot convert lambda expression to intended delegate type because some of the return types in the block are not implicitly convertible to the delegate return type.

How can i resolve the cast errors please so that I can have the job get dequeued and ran in the consumer (i am not fussed about any specific object types, since we set the function to call in the Task constructor)?

How also would I comminicate back to the calling thread that called Send, that the function completed in the OnHandlerStart method?

Any other best practices that I should be implementing for this multi-queue consumer setup, and if i am reinventing the wheel some code on this use case using existing TPL structure would be much appreciated?

Upvotes: 1

Views: 118

Answers (2)

Theodor Zoulias
Theodor Zoulias

Reputation: 43812

Here is how you could use an ActionBlock class from the TPL Dataflow library, to process the requests and get a task for each request. Basically you just need to pass a TaskCompletionSource<TResponse> along with each request. Pairing them together using a ValueTuple is convenient:

public class ActionTaskBlock<TRequest, TResponse>
{
    private readonly ActionBlock<(TRequest,
        TaskCompletionSource<TResponse>)> _actionBlock;

    /// <summary>Initializes a new instance of the
    /// <see cref="ActionTaskBlock{TRequest,TResponse}"/> class with the
    /// specified process delegate, cancellation token, and max degree of
    /// parallelism.</summary>
    public ActionTaskBlock(Func<TRequest, CancellationToken, TResponse> process,
        CancellationToken cancellationToken, int maxDegreeOfParallelism)
    {
        _actionBlock = new ActionBlock<
            (TRequest Request, TaskCompletionSource<TResponse> TCS)>(entry =>
        {
            try
            {
                var response = process(entry.Request, cancellationToken);
                entry.TCS.SetResult(response);
            }
            catch (OperationCanceledException)
            {
                entry.TCS.TrySetCanceled();
            }
            catch (Exception ex)
            {
                entry.TCS.TrySetException(ex);
            }
        }, new ExecutionDataflowBlockOptions()
        {
            CancellationToken = cancellationToken,
            MaxDegreeOfParallelism = maxDegreeOfParallelism,
        });
    }

    /// <summary>Signals to the block that it shouldn't accept any more
    /// requests.</summary>
    public void Complete() => _actionBlock.Complete();

    /// <summary>Gets a <see cref="Task"/> object that represents the
    /// asynchronous operation and completion of the block.</summary>
    public Task Completion => _actionBlock.Completion;

    /// <summary>Schedules a <typeparamref name="TRequest"/> for processing
    /// by the block.</summary>
    public async Task<TResponse> ProcessAsync(TRequest request)
    {
        var tsc = new TaskCompletionSource<TResponse>(
            TaskCreationOptions.RunContinuationsAsynchronously);
        await _actionBlock.SendAsync((request, tsc)).ConfigureAwait(false);
        return await tsc.Task.ConfigureAwait(false);
    }
}

Usage example:

// Create the block
var cts = new CancellationTokenSource(2000); // Cancel after 2000 msec
var block = new ActionTaskBlock<int, int>((item, token) =>
{
    Console.WriteLine($"Start processing {item}");
    Task.WhenAny(Task.Delay(1000, token)).Wait(); // Sleep safely for 1000 msec
    token.ThrowIfCancellationRequested();
    return item * 2;
}, cts.Token, maxDegreeOfParallelism: 2); // Process no more than 2 at a time

// Feed the block with one request every 300 msec
foreach (var i in Enumerable.Range(1, 10))
{
    Console.WriteLine($"Scheduling {i}");
    block.ProcessAsync(i).ContinueWith(t =>
    {
        Console.WriteLine($"Item {i} processed with status {t.Status}");
    });
    Thread.Sleep(300);
    if (cts.IsCancellationRequested) break;
}
block.Complete();

// Wait for the completion of all requests, or the cancellation of the token
Task.WhenAny(block.Completion).Wait(); // Safe waiting (doesn't throw)
Console.WriteLine($"The block finished with status {block.Completion.Status}");

Output:

Scheduling 1
Start processing 1
Scheduling 2
Start processing 2
Scheduling 3
Scheduling 4
Item 1 processed with status RanToCompletion
Start processing 3
Scheduling 5
Item 2 processed with status RanToCompletion
Start processing 4
Scheduling 6
Scheduling 7
Item 4 processed with status Canceled
Item 3 processed with status Canceled
The block finished with status Canceled

The constructor of the ActionTaskBlock class accepts only synchronous delegates. It would be fairly easy to implement an overload that accepts as argument an asynchronous delegate. All blocks of the TPL Dataflow library are async ready, meaning that they accept asynchronous delegates as well: async entry => { ...= await process(...

Upvotes: 1

Luaan
Luaan

Reputation: 63772

The Task constructor doesn't unwrap the task in the delegate. It isn't aware of it at all, and doesn't care about it. This is one of the things you get for using the "advanced scenario" - don't expect that new Task works the same as Task.Run, just without starting the delegate. It doesn't.

A more typical way of implementing what you're trying to do would be making your own task scheduler. But if you want to use your approach, you need to do the unwrapping manually. This can be a bit more complicated than it sounds, but fortunately, there's a method that does exactly what you need:

var task = new Task<Task<TResponse>>(() => func(request, cancellationToken)).Unwrap();

Less fortunately, this is still a promise-style task, and cannot be started. So you need to go even further:

internal class ShardedSendStrategy
{
  const int QueueCount = 8;
  readonly BlockingCollection<Action>[] Queues 
    = new BlockingCollection<Action>[QueueCount];
  readonly CancellationToken CancellationToken;

  public ShardedSendStrategy(CancellationToken cancellationToken)
  {
    CancellationToken = cancellationToken;

    for (int i = 0; i < QueueCount; i++)
    {
      var id = i;
      Queues[id] = new BlockingCollection<Action>();
      var thread = new Thread( () => OnHandlerStart(Queues[id])) 
                             { IsBackground = true };
      thread.Start();
    }
  }

  public Task<TResponse> Send<TResponse>(
    Func<IRequest<TResponse>, CancellationToken, Task<TResponse>> func, 
    IRequest<TResponse> request, CancellationToken cancellationToken = default)
  {
    var shard = request.GetHashCode() % QueueCount;
    var tcs = new TaskCompletionSource<TResponse>();

    Queues[shard].Add(() => {
      var result = func(request, cancellationToken).Result;

      tcs.SetResult(result);
    });

    return tcs.Task;
  }

  private void OnHandlerStart(BlockingCollection<Action> queue)
  {
    foreach (var job in queue.GetConsumingEnumerable(CancellationToken))
    {
      job();
    }
  }
}

The main problem is that this waits for the result of the task synchronously on your background thread. This can be a problem if there's dependencies between your tasks - you could get a deadlock where one task is waiting for another task to do its job, but that one can't get its work done because it's waiting for a spot in your "queue".

If you do decide to use a similar approach, make sure to add proper error handling. You need to handle exceptions, cancellation etc.

Again, implementing your own task scheduler is probably a better idea.

Upvotes: 2

Related Questions