Reputation: 71
I've created a bidirectional streaming gRPC endpoint using .NET 7:
public override async Task StreamCommands(IAsyncStreamReader<State> requestStream, IServerStreamWriter<Command> responseStream, ServerCallContext context)
{
await foreach (var message in requestStream.ReadAllAsync())
{
_logger.LogInformation("Client sent state {}", message);
}
}
I've deployed this code to Azure App Services, with Easy Auth Authentication enabled (the gRPC path is excluded from authentication), and configured the HTTP 2 settings as per the documentation. I'm using the same port in my app for regular HTTP 1 and HTTP 2 gRPC traffic (Kestrel->EndpointDefaults->Protocols == 'Http1AndHttp2'). Basic gRPC calls work without issues.
The streaming call however fails after 5-6 seconds, with this exception:
fail: Grpc.AspNetCore.Server.ServerCallHandler[6]
Error when executing service method 'StreamCommands'.
System.InvalidOperationException: Can't read messages after the request is complete.
at Grpc.Core.AsyncStreamReaderExtensions.ReadAllAsyncCore[T](IAsyncStreamReader`1 streamReader, CancellationToken cancellationToken)+MoveNext()
at Grpc.Core.AsyncStreamReaderExtensions.ReadAllAsyncCore[T](IAsyncStreamReader`1 streamReader, CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()
at MyApp.Services.MyGrpcService.StreamCommands(IAsyncStreamReader`1 requestStream, IServerStreamWriter`1 responseStream, ServerCallContext context) in /MyApp/Services/MyGrpcService.cs:line 70
at MyApp.Services.MyGrpcService.StreamCommands(IAsyncStreamReader`1 requestStream, IServerStreamWriter`1 responseStream, ServerCallContext context) in /MyApp/Services/MyGrpcService.cs:line 70
at Grpc.Shared.Server.DuplexStreamingServerMethodInvoker`3.Invoke(HttpContext httpContext, ServerCallContext serverCallContext, IAsyncStreamReader`1 requestStream, IServerStreamWriter`1 responseStream)
at Grpc.Shared.Server.DuplexStreamingServerMethodInvoker`3.Invoke(HttpContext httpContext, ServerCallContext serverCallContext, IAsyncStreamReader`1 requestStream, IServerStreamWriter`1 responseStream)
at Grpc.AspNetCore.Server.Internal.CallHandlers.DuplexStreamingServerCallHandler`3.HandleCallAsyncCore(HttpContext httpContext, HttpContextServerCallContext serverCallContext)
at Grpc.AspNetCore.Server.Internal.CallHandlers.ServerCallHandlerBase`3.<HandleCallAsync>g__AwaitHandleCall|8_0(HttpContextServerCallContext serverCallContext, Method`2 method, Task handleCall)
Azure App Services log:
fail: Middleware[0]
Failed to forward request to http://169.254.130.5:8080. Encountered a System.Net.Http.HttpRequestException exception after 5356.990ms with message: An error occurred while sending the request.. Check application logs to verify the application is properly handling HTTP traffic.
I expected the streaming gRPC call to stay open, which works when testing locally.
Upvotes: 1
Views: 389
Reputation: 3591
The code below uses the RPC method with protobuf, server-side implementation, and client-side implementation.
Server-side code:
await foreach
to read the stream of responses from the server. public class StockDataService : StockService.StockServiceBase
{
private readonly ILogger<StockDataService> _logger;
private readonly IJWTAuthenticationManager _jWTAuthenticationManager;
private static readonly List<Stock> _stocks = new List<Stock>
{
{new Stock {StockId = "FB", StockName = "Facebook"} },
{new Stock {StockId = "AAPL", StockName = "Apple"} },
{new Stock {StockId = "AMZN", StockName = "Amazon"} },
{new Stock {StockId = "NFLX", StockName = "Netflix"} },
{new Stock {StockId = "MSFT", StockName = "Microsoft"} },
{new Stock {StockId = "TSLA", StockName = "Tesla"} },
{new Stock {StockId = "GOOG", StockName = "Alphabet"} }
};
public StockDataService(ILogger<StockDataService> logger, IJWTAuthenticationManager jWTAuthenticationManager)
{
_logger = logger;
_jWTAuthenticationManager = jWTAuthenticationManager;
}
public override Task<StockListing> GetStockListings(Empty request, ServerCallContext context)
{
return Task.FromResult(new StockListing { Stocks = { _stocks } });
}
public override Task<StockPrice> GetStockPrice(Stock request, ServerCallContext context)
{
var rnd = new Random(100);
return Task.FromResult(
new StockPrice
{
Stock = _stocks.FirstOrDefault(x => x.StockId == request.StockId),
DateTimeStamp = DateTime.UtcNow.ToTimestamp(),
Price = rnd.Next(100, 500).ToString()
});
}
public override async Task GetStockPriceStream(Empty request, IServerStreamWriter<StockPrice> responseStream, ServerCallContext context)
{
int i = 10;
var rnd = new Random(100);
while (!context.CancellationToken.IsCancellationRequested && i > 0)
{
_stocks.ForEach(async s =>
{
var time = DateTime.UtcNow.ToTimestamp();
await responseStream.WriteAsync(new StockPrice
{
Stock = s,
DateTimeStamp = time,
Price = rnd.Next(100, 500).ToString()
});
});
await Task.Delay(500);
}
}
public override async Task<StocksPrices> GetStocksPrices(IAsyncStreamReader<Stock> requestStream, ServerCallContext context)
{
var rnd = new Random(100);
var inputStocksList = new List<Stock>();
await foreach (var request in requestStream.ReadAllAsync())
{
inputStocksList.Add(request);
_logger.LogInformation($"Getting stock Price for {request.StockName}({request.StockId})");
}
var response = new StocksPrices();
foreach (var inputStock in inputStocksList)
{
response.StockPriceList.Add(
new StockPrice
{
Stock = inputStock,
DateTimeStamp = DateTime.UtcNow.ToTimestamp(),
Price = rnd.Next(100, 500).ToString()
});
}
return response;
}
public override async Task GetCompanyStockPriceStream(IAsyncStreamReader<Stock> requestStream, IServerStreamWriter<StockPrice> responseStream, ServerCallContext context)
{
var channel = Channel.CreateUnbounded<StockPrice>();
_ = Task.Run(async () =>
{
await foreach (var stockPrice in channel.Reader.ReadAllAsync())
{
await responseStream.WriteAsync(stockPrice);
}
});
var getCompanyStockPriceStreamRequestTasks = new List<Task>();
try
{
await foreach (var request in requestStream.ReadAllAsync())
{
_logger.LogInformation($"Getting stock Price for {request.StockName}({request.StockId})");
getCompanyStockPriceStreamRequestTasks.Add(GetStockPriceAsync(request));
}
_logger.LogInformation("Client finished streaming");
}
catch (Exception ex)
{
_logger.LogError(ex, "An exception occurred");
}
await Task.WhenAll(getCompanyStockPriceStreamRequestTasks);
channel.Writer.TryComplete();
await channel.Reader.Completion;
_logger.LogInformation("Completed response streaming");
async Task GetStockPriceAsync(Stock stock)
{
var rnd = new Random(100);
for (int i = 0; i < 10; i++)
{
var time = DateTime.UtcNow.ToTimestamp();
await channel.Writer.WriteAsync(new StockPrice
{
Stock = _stocks.FirstOrDefault(x => x.StockId == stock.StockId),
Price = rnd.Next(100, 500).ToString(),
DateTimeStamp = time
});
await Task.Delay(500);
}
}
}
[AllowAnonymous]
public override Task<AuthResponse> Authenticate(ClientCred request, ServerCallContext context)
{
var token = _jWTAuthenticationManager.Authenticate(request.ClientId, request.ClientSecret);
if (token == null)
return null;
return Task.FromResult(new AuthResponse { BearerToken = token });
}
}
}
Client side:
RequestStream.WriteAsync
to send a stream of requests and completing the stream with RequestStream.CompleteAsync
public override async Task StreamCommands(IAsyncStreamReader<State> requestStream, IServerStreamWriter<Command> responseStream, ServerCallContext context)
{
try
{
await foreach (var message in requestStream.ReadAllAsync(context.CancellationToken))
{
_logger.LogInformation("Client sent state {}", message);
// Process the received state and send a corresponding command
var command = // your logic to create a response command based on the received state
await responseStream.WriteAsync(command);
}
}
catch (OperationCanceledException)
{
// Handle cancellation (stream closed by client or other reasons)
_logger.LogInformation("Stream closed by client or canceled.");
}
catch (Exception ex)
{
// Handle other exceptions
_logger.LogError(ex, "Error processing streaming request.");
}
}
Upvotes: 0