Daniel Leiszen
Daniel Leiszen

Reputation: 1897

Writing and reading to/from gRPC full duplex channel in .NET simultaneously using async enumerables

I have a client-server scenario with a pipeline fully implemented with gRPC duplex streams. The pipeline is initiated by the client. At every step (duplex call) the client reads items from an IAsyncEnumerable and writes them on the channel. The items are processed by the server and some of them are sent back on the channel asynchronously. Then the client returns the read items with yield. I have 4 methods chained up like that.

The pattern I use:

public async IAsyncEnumerable<Result> Results(IAsyncEnumerable<Input> inputs)
{
    var duplex = Server.GetResults();

    await foreach (var input in inputs)
    {
        await duplex.RequestStream.WriteAsync(input);
    }

    await duplex.RequestStream.CompleteAsync();

    await foreach (var result in duplex.ResponseStream.ToAsyncEnumerable())
    {
        yield return result;
    }
}

The problem with this pattern is that the server sends the data back before the second foreach (reading from the channel) kicks in. If there is a lot of data, the channel buffer becomes full (presumably) before the reading starts and an Operation cancelled exception is thrown by gRPC.

How can I refactor this without breaking the usage of IAsyncEnumerables and yield? If that is not possible, then how else can I achieve a "Write first, read next" scenario?

Just for clarification, I cannot use the following pattern:

while(await stream.MoveNext()) 
{
   // send
   // yield
}

The moment I call await duplex.ResponseStream.MoveNext() the client waits for the server to return data. But that is unpredictable which data is being returned by the server. So that would cause a deadlock.

I would be needing some kind of IsDataAvailableOnChannel but no such method exists on AsyncDuplexStreamingCall

Upvotes: 0

Views: 2331

Answers (1)

Charlieface
Charlieface

Reputation: 71579

You can hand off the sending of data to another task, then await it once you have finished yielding.

Given that I assume you have a normal ThreadPool scheduler, the await of each function should be interleaved.

public async IAsyncEnumerable<Result> Results(IAsyncEnumerable<Input> inputs)
{
    var duplex = Server.GetResults();

    var sendingTask = Task.Run(async () =>
    {
        await foreach (var input in inputs)
        {
            await duplex.RequestStream.WriteAsync(input);
        }

        await duplex.RequestStream.CompleteAsync();
    });

    await foreach (var result in duplex.ResponseStream.ToAsyncEnumerable())
    {
        yield return result;
    }

    await sendingTask;
}

It might be easier to manage if you place the anonymous lambda into a proper function.

Upvotes: 1

Related Questions