Robert Schmid
Robert Schmid

Reputation: 71

Streaming gRPC call to ASP.NET Core app fails in Azure App Services

I've created a bidirectional streaming gRPC endpoint using .NET 7:

public override async Task StreamCommands(IAsyncStreamReader<State> requestStream, IServerStreamWriter<Command> responseStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        _logger.LogInformation("Client sent state {}", message);
    }
}

I've deployed this code to Azure App Services, with Easy Auth Authentication enabled (the gRPC path is excluded from authentication), and configured the HTTP 2 settings as per the documentation. I'm using the same port in my app for regular HTTP 1 and HTTP 2 gRPC traffic (Kestrel->EndpointDefaults->Protocols == 'Http1AndHttp2'). Basic gRPC calls work without issues.

The streaming call however fails after 5-6 seconds, with this exception:

fail: Grpc.AspNetCore.Server.ServerCallHandler[6]
      Error when executing service method 'StreamCommands'.
      System.InvalidOperationException: Can't read messages after the request is complete.
         at Grpc.Core.AsyncStreamReaderExtensions.ReadAllAsyncCore[T](IAsyncStreamReader`1 streamReader, CancellationToken cancellationToken)+MoveNext()
         at Grpc.Core.AsyncStreamReaderExtensions.ReadAllAsyncCore[T](IAsyncStreamReader`1 streamReader, CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()
         at MyApp.Services.MyGrpcService.StreamCommands(IAsyncStreamReader`1 requestStream, IServerStreamWriter`1 responseStream, ServerCallContext context) in /MyApp/Services/MyGrpcService.cs:line 70
         at MyApp.Services.MyGrpcService.StreamCommands(IAsyncStreamReader`1 requestStream, IServerStreamWriter`1 responseStream, ServerCallContext context) in /MyApp/Services/MyGrpcService.cs:line 70
         at Grpc.Shared.Server.DuplexStreamingServerMethodInvoker`3.Invoke(HttpContext httpContext, ServerCallContext serverCallContext, IAsyncStreamReader`1 requestStream, IServerStreamWriter`1 responseStream)
         at Grpc.Shared.Server.DuplexStreamingServerMethodInvoker`3.Invoke(HttpContext httpContext, ServerCallContext serverCallContext, IAsyncStreamReader`1 requestStream, IServerStreamWriter`1 responseStream)
         at Grpc.AspNetCore.Server.Internal.CallHandlers.DuplexStreamingServerCallHandler`3.HandleCallAsyncCore(HttpContext httpContext, HttpContextServerCallContext serverCallContext)
         at Grpc.AspNetCore.Server.Internal.CallHandlers.ServerCallHandlerBase`3.<HandleCallAsync>g__AwaitHandleCall|8_0(HttpContextServerCallContext serverCallContext, Method`2 method, Task handleCall)

Azure App Services log:

fail: Middleware[0]
      Failed to forward request to http://169.254.130.5:8080. Encountered a System.Net.Http.HttpRequestException exception after 5356.990ms with message: An error occurred while sending the request.. Check application logs to verify the application is properly handling HTTP traffic.

I expected the streaming gRPC call to stay open, which works when testing locally.

Upvotes: 1

Views: 389

Answers (1)

Sampath
Sampath

Reputation: 3591

The code below uses the RPC method with protobuf, server-side implementation, and client-side implementation.

Server-side code:

  • The following sample code handles streaming responses and writes them asynchronously to the response stream.
  • Use await foreach to read the stream of responses from the server.
    public class StockDataService : StockService.StockServiceBase
    {
        private readonly ILogger<StockDataService> _logger;
        private readonly IJWTAuthenticationManager _jWTAuthenticationManager;

        private static readonly List<Stock> _stocks = new List<Stock> 
        {
            {new Stock {StockId = "FB", StockName = "Facebook"} },
            {new Stock {StockId = "AAPL", StockName = "Apple"} },
            {new Stock {StockId = "AMZN", StockName = "Amazon"} },
            {new Stock {StockId = "NFLX", StockName = "Netflix"} },
            {new Stock {StockId = "MSFT", StockName = "Microsoft"} },
            {new Stock {StockId = "TSLA", StockName = "Tesla"} },
            {new Stock {StockId = "GOOG", StockName = "Alphabet"} }
        }; 

        public StockDataService(ILogger<StockDataService> logger, IJWTAuthenticationManager jWTAuthenticationManager)
        {
            _logger = logger;
            _jWTAuthenticationManager = jWTAuthenticationManager;
        }

        public override Task<StockListing> GetStockListings(Empty request, ServerCallContext context)
        {
            return Task.FromResult(new StockListing { Stocks = { _stocks }  });
        }

        public override Task<StockPrice> GetStockPrice(Stock request, ServerCallContext context)
        {
            var rnd = new Random(100);
            return Task.FromResult(
                new StockPrice
                {
                    Stock = _stocks.FirstOrDefault(x => x.StockId == request.StockId),
                    DateTimeStamp = DateTime.UtcNow.ToTimestamp(),
                    Price = rnd.Next(100, 500).ToString()
                });
        }

        public override async Task GetStockPriceStream(Empty request, IServerStreamWriter<StockPrice> responseStream, ServerCallContext context)
        {            
            int i = 10;
            var rnd = new Random(100);
            while (!context.CancellationToken.IsCancellationRequested && i > 0)
            {
                _stocks.ForEach(async s =>
                {
                    var time = DateTime.UtcNow.ToTimestamp();                    
                    await responseStream.WriteAsync(new StockPrice 
                    { 
                        Stock = s,
                        DateTimeStamp = time,
                        Price = rnd.Next(100, 500).ToString()
                    });
                });

                await Task.Delay(500);
            }
        }

        public override async Task<StocksPrices> GetStocksPrices(IAsyncStreamReader<Stock> requestStream, ServerCallContext context)
        {
            var rnd = new Random(100);
            var inputStocksList = new List<Stock>();
            await foreach (var request in requestStream.ReadAllAsync())
            {
                inputStocksList.Add(request);
                _logger.LogInformation($"Getting stock Price for {request.StockName}({request.StockId})");
            }

            var response = new StocksPrices();
            foreach (var inputStock in inputStocksList)
            {
                response.StockPriceList.Add(
                    new StockPrice
                    {
                        Stock = inputStock,
                        DateTimeStamp = DateTime.UtcNow.ToTimestamp(),
                        Price = rnd.Next(100, 500).ToString()
                    });
            }

            return response;
        }

        public override async Task GetCompanyStockPriceStream(IAsyncStreamReader<Stock> requestStream, IServerStreamWriter<StockPrice> responseStream, ServerCallContext context)
        {
            
            var channel = Channel.CreateUnbounded<StockPrice>();

            
            _ = Task.Run(async () =>
            {
                await foreach (var stockPrice in channel.Reader.ReadAllAsync())
                {
                    await responseStream.WriteAsync(stockPrice);
                }
            });

       
            var getCompanyStockPriceStreamRequestTasks = new List<Task>();

            try
            {
               
                await foreach (var request in requestStream.ReadAllAsync())
                {
                    _logger.LogInformation($"Getting stock Price for {request.StockName}({request.StockId})");
                   
                    getCompanyStockPriceStreamRequestTasks.Add(GetStockPriceAsync(request));
                }

                _logger.LogInformation("Client finished streaming");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "An exception occurred");
            }

           
            await Task.WhenAll(getCompanyStockPriceStreamRequestTasks);

            channel.Writer.TryComplete();

        
            await channel.Reader.Completion;

            _logger.LogInformation("Completed response streaming");

          
            async Task GetStockPriceAsync(Stock stock)
            {
                var rnd = new Random(100);
                for (int i = 0; i < 10; i++)
                {
                    var time = DateTime.UtcNow.ToTimestamp();
                    await channel.Writer.WriteAsync(new StockPrice 
                    {
                        Stock = _stocks.FirstOrDefault(x => x.StockId == stock.StockId),
                        Price = rnd.Next(100, 500).ToString(),
                        DateTimeStamp = time
                    });

                    await Task.Delay(500);
                }
            }
        }

        [AllowAnonymous]
        public override Task<AuthResponse> Authenticate(ClientCred request, ServerCallContext context)
        {
            var token = _jWTAuthenticationManager.Authenticate(request.ClientId, request.ClientSecret);

            if (token == null)
                return null;

            return Task.FromResult(new AuthResponse { BearerToken = token });
        }
    }
}

enter image description here

  • code reference taken from git

Client side:

  • The code below handles the stream of requests and respond with a single response.
  • Using RequestStream.WriteAsync to send a stream of requests and completing the stream with RequestStream.CompleteAsync

public override async Task StreamCommands(IAsyncStreamReader<State> requestStream, IServerStreamWriter<Command> responseStream, ServerCallContext context)
{
    try
    {
        await foreach (var message in requestStream.ReadAllAsync(context.CancellationToken))
        {
            _logger.LogInformation("Client sent state {}", message);

            // Process the received state and send a corresponding command
            var command = // your logic to create a response command based on the received state
            await responseStream.WriteAsync(command);
        }
    }
    catch (OperationCanceledException)
    {
        // Handle cancellation (stream closed by client or other reasons)
        _logger.LogInformation("Stream closed by client or canceled.");
    }
    catch (Exception ex)
    {
        // Handle other exceptions
        _logger.LogError(ex, "Error processing streaming request.");
    }
}

enter image description here

Upvotes: 0

Related Questions