uriDium
uriDium

Reputation: 13420

Can you avoid Task Continuations when using async/await if you want execution to continue immediately

I am working on a protocol and trying to use as much async/await as I can to make it scale well. The protocol will have to support hundreds to thousands of simultaneous connections. Below is a little bit of pseudo code to illustrate my problem.

private static async void DoSomeWork()
{
    var protocol = new FooProtocol();
    await protocol.Connect("127.0.0.1", 1234);
    var i = 0;
    while(i != int.MaxValue)
    {
        i++;
        var request = new FooRequest();
        request.Payload = "Request Nr " + i;
        var task = protocol.Send(request);
        _ = task.ContinueWith(async tmp =>
        {
            var resp = await task;
            Console.WriteLine($"Request {resp.SequenceNr} Successful: {(resp.Status == 0)}");
         });

    }
}

And below is a little pseudo code for the protocol.

public class FooProtocol
{
    private int sequenceNr = 0;
    
    private SemaphoreSlim ss = new SemaphoreSlim(20, 20);
    
    public Task<FooResponse> Send(FooRequest fooRequest)
    {
        var tcs = new TaskCompletionSource<FooResponse>();
        ss.Wait();
        var tmp = Interlocked.Increment(ref sequenceNr);
        fooRequest.SequenceNr = tmp;
        // Faking some arbitrary delay. This work is done over sockets. 
        Task.Run(async () =>
        {
            await Task.Delay(1000);
            tcs.SetResult(new FooResponse() {SequenceNr = tmp});
            ss.Release();
        });
        return tcs.Task;

    }
}

I have a protocol with request and response pairs. I have used asynchronous socket programming. The FooProtocol will take care of matching up request with responses (sequence numbers) and will also take care of the maximum number of pending requests. (Done in the pseudo and my code with a semaphore slim, So I am not worried about run away requests). The DoSomeWork method calls the Protocol.Send method, but I don't want to await the response, I want to spin around and send the next one until I am blocked by the maximum number of pending requests. When the task does complete I want to check the response and maybe do some work.

I would like to fix two things

  1. I would like to avoid using Task.ContinueWith() because it seems to not fit in cleanly with the async/await patterns
  2. Because I have awaited on the connection, I have had to use the async modifier. Now I get warnings from the IDE "Because this call is not waited, execution of the current method continues before this call is complete. Consider applying the 'await' operator to the result of the call." I don't want to do that, because as soon as I do it ruins the protocol's ability to have many requests in flight. The only way I can get rid of the warning is to use a discard. Which isn't the worst thing but I can't help but feel like I am missing a trick and fighting this too hard.

Upvotes: 0

Views: 96

Answers (2)

Theodor Zoulias
Theodor Zoulias

Reputation: 43515

I am not sure that it's a good idea to enforce the maximum concurrency at the protocol level. It seems to me that this responsibility belongs to the caller of the protocol. So I would remove the SemaphoreSlim, and let it do the one thing that it knows to do well:

public class FooProtocol
{
    private int sequenceNr = 0;

    public async Task<FooResponse> Send(FooRequest fooRequest)
    {
        var tmp = Interlocked.Increment(ref sequenceNr);
        fooRequest.SequenceNr = tmp;
        await Task.Delay(1000); // Faking some arbitrary delay
        return new FooResponse() { SequenceNr = tmp };
    }
}

Then I would use an ActionBlock from the TPL Dataflow library in order to coordinate the process of sending a massive number of requests through the protocol, by handling the concurrency, the backpreasure (BoundedCapacity), the cancellation (if needed), the error-handling, and the status of the whole operation (running, completed, failed etc). Example:

private static async Task DoSomeWorkAsync()
{
    var protocol = new FooProtocol();

    var actionBlock = new ActionBlock<FooRequest>(async request =>
    {
        var resp = await protocol.Send(request);
        Console.WriteLine($"Request {resp.SequenceNr} Status: {resp.Status}");
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 20,
        BoundedCapacity = 100
    });

    await protocol.Connect("127.0.0.1", 1234);

    foreach (var i in Enumerable.Range(0, Int32.MaxValue))
    {
        var request = new FooRequest();
        request.Payload = "Request Nr " + i;
        var accepted = await actionBlock.SendAsync(request);
        if (!accepted) break; // The block has failed irrecoverably
    }
    actionBlock.Complete();
    await actionBlock.Completion; // Propagate any exceptions
}

The BoundedCapacity = 100 configuration means that the ActionBlock will store in its internal buffer at most 100 requests. When this threshold is reached, anyone who wants to send more requests to it will have to wait. The awaiting will happen in the await actionBlock.SendAsync line.

Upvotes: 1

Stephen Cleary
Stephen Cleary

Reputation: 456507

Side note: I hope your actual code is using SemaphoreSlim.WaitAsync rather than SemaphoreSlim.Wait.

In most socket code, you do end up with a list of connections, and along with each connection is a "processor" of some kind. In the async world, this is naturally represented as a Task.

So you will need to keep a list of Tasks; at the very least, your consuming application will need to know when it is safe to shut down (i.e., all responses have been received).

Don't preemptively worry about using Task.Run; as long as you aren't blocking (e.g., SemaphoreSlim.Wait), you probably will not starve the thread pool. Remember that during the awaits, no thread pool thread is used.

Upvotes: 1

Related Questions