在 Azure 应用服务中对 ASP.NET Core 应用程序进行流式 gRPC 调用失败

问题描述 投票:0回答:1

我使用 .NET 7 创建了一个双向流 gRPC 端点:

public override async Task StreamCommands(IAsyncStreamReader<State> requestStream, IServerStreamWriter<Command> responseStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        _logger.LogInformation("Client sent state {}", message);
    }
}

我已将此代码部署到 Azure 应用服务,启用了 Easy Auth 身份验证(gRPC 路径从身份验证中排除),并根据文档配置了 HTTP 2 设置。我在应用程序中使用相同的端口来处理常规 HTTP 1 和 HTTP 2 gRPC 流量(Kestrel->EndpointDefaults->Protocols == 'Http1AndHttp2')。基本的 gRPC 调用可以正常工作。

但是,流式调用会在 5-6 秒后失败,但有以下例外:

fail: Grpc.AspNetCore.Server.ServerCallHandler[6]
      Error when executing service method 'StreamCommands'.
      System.InvalidOperationException: Can't read messages after the request is complete.
         at Grpc.Core.AsyncStreamReaderExtensions.ReadAllAsyncCore[T](IAsyncStreamReader`1 streamReader, CancellationToken cancellationToken)+MoveNext()
         at Grpc.Core.AsyncStreamReaderExtensions.ReadAllAsyncCore[T](IAsyncStreamReader`1 streamReader, CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()
         at MyApp.Services.MyGrpcService.StreamCommands(IAsyncStreamReader`1 requestStream, IServerStreamWriter`1 responseStream, ServerCallContext context) in /MyApp/Services/MyGrpcService.cs:line 70
         at MyApp.Services.MyGrpcService.StreamCommands(IAsyncStreamReader`1 requestStream, IServerStreamWriter`1 responseStream, ServerCallContext context) in /MyApp/Services/MyGrpcService.cs:line 70
         at Grpc.Shared.Server.DuplexStreamingServerMethodInvoker`3.Invoke(HttpContext httpContext, ServerCallContext serverCallContext, IAsyncStreamReader`1 requestStream, IServerStreamWriter`1 responseStream)
         at Grpc.Shared.Server.DuplexStreamingServerMethodInvoker`3.Invoke(HttpContext httpContext, ServerCallContext serverCallContext, IAsyncStreamReader`1 requestStream, IServerStreamWriter`1 responseStream)
         at Grpc.AspNetCore.Server.Internal.CallHandlers.DuplexStreamingServerCallHandler`3.HandleCallAsyncCore(HttpContext httpContext, HttpContextServerCallContext serverCallContext)
         at Grpc.AspNetCore.Server.Internal.CallHandlers.ServerCallHandlerBase`3.<HandleCallAsync>g__AwaitHandleCall|8_0(HttpContextServerCallContext serverCallContext, Method`2 method, Task handleCall)

Azure 应用服务日志:

fail: Middleware[0]
      Failed to forward request to http://169.254.130.5:8080. Encountered a System.Net.Http.HttpRequestException exception after 5356.990ms with message: An error occurred while sending the request.. Check application logs to verify the application is properly handling HTTP traffic.

我希望流式 gRPC 调用保持打开状态,这在本地测试时有效。

asp.net-core azure-web-app-service grpc
1个回答
0
投票

下面的代码使用了带有protobuf的RPC方法,服务器端实现,以及客户端实现

服务器端代码:

  • 以下示例代码处理流响应并将其异步写入响应流。
  • 使用
    await foreach
    读取来自服务器的响应流。
    public class StockDataService : StockService.StockServiceBase
    {
        private readonly ILogger<StockDataService> _logger;
        private readonly IJWTAuthenticationManager _jWTAuthenticationManager;

        private static readonly List<Stock> _stocks = new List<Stock> 
        {
            {new Stock {StockId = "FB", StockName = "Facebook"} },
            {new Stock {StockId = "AAPL", StockName = "Apple"} },
            {new Stock {StockId = "AMZN", StockName = "Amazon"} },
            {new Stock {StockId = "NFLX", StockName = "Netflix"} },
            {new Stock {StockId = "MSFT", StockName = "Microsoft"} },
            {new Stock {StockId = "TSLA", StockName = "Tesla"} },
            {new Stock {StockId = "GOOG", StockName = "Alphabet"} }
        }; 

        public StockDataService(ILogger<StockDataService> logger, IJWTAuthenticationManager jWTAuthenticationManager)
        {
            _logger = logger;
            _jWTAuthenticationManager = jWTAuthenticationManager;
        }

        public override Task<StockListing> GetStockListings(Empty request, ServerCallContext context)
        {
            return Task.FromResult(new StockListing { Stocks = { _stocks }  });
        }

        public override Task<StockPrice> GetStockPrice(Stock request, ServerCallContext context)
        {
            var rnd = new Random(100);
            return Task.FromResult(
                new StockPrice
                {
                    Stock = _stocks.FirstOrDefault(x => x.StockId == request.StockId),
                    DateTimeStamp = DateTime.UtcNow.ToTimestamp(),
                    Price = rnd.Next(100, 500).ToString()
                });
        }

        public override async Task GetStockPriceStream(Empty request, IServerStreamWriter<StockPrice> responseStream, ServerCallContext context)
        {            
            int i = 10;
            var rnd = new Random(100);
            while (!context.CancellationToken.IsCancellationRequested && i > 0)
            {
                _stocks.ForEach(async s =>
                {
                    var time = DateTime.UtcNow.ToTimestamp();                    
                    await responseStream.WriteAsync(new StockPrice 
                    { 
                        Stock = s,
                        DateTimeStamp = time,
                        Price = rnd.Next(100, 500).ToString()
                    });
                });

                await Task.Delay(500);
            }
        }

        public override async Task<StocksPrices> GetStocksPrices(IAsyncStreamReader<Stock> requestStream, ServerCallContext context)
        {
            var rnd = new Random(100);
            var inputStocksList = new List<Stock>();
            await foreach (var request in requestStream.ReadAllAsync())
            {
                inputStocksList.Add(request);
                _logger.LogInformation($"Getting stock Price for {request.StockName}({request.StockId})");
            }

            var response = new StocksPrices();
            foreach (var inputStock in inputStocksList)
            {
                response.StockPriceList.Add(
                    new StockPrice
                    {
                        Stock = inputStock,
                        DateTimeStamp = DateTime.UtcNow.ToTimestamp(),
                        Price = rnd.Next(100, 500).ToString()
                    });
            }

            return response;
        }

        public override async Task GetCompanyStockPriceStream(IAsyncStreamReader<Stock> requestStream, IServerStreamWriter<StockPrice> responseStream, ServerCallContext context)
        {
            
            var channel = Channel.CreateUnbounded<StockPrice>();

            
            _ = Task.Run(async () =>
            {
                await foreach (var stockPrice in channel.Reader.ReadAllAsync())
                {
                    await responseStream.WriteAsync(stockPrice);
                }
            });

       
            var getCompanyStockPriceStreamRequestTasks = new List<Task>();

            try
            {
               
                await foreach (var request in requestStream.ReadAllAsync())
                {
                    _logger.LogInformation($"Getting stock Price for {request.StockName}({request.StockId})");
                   
                    getCompanyStockPriceStreamRequestTasks.Add(GetStockPriceAsync(request));
                }

                _logger.LogInformation("Client finished streaming");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "An exception occurred");
            }

           
            await Task.WhenAll(getCompanyStockPriceStreamRequestTasks);

            channel.Writer.TryComplete();

        
            await channel.Reader.Completion;

            _logger.LogInformation("Completed response streaming");

          
            async Task GetStockPriceAsync(Stock stock)
            {
                var rnd = new Random(100);
                for (int i = 0; i < 10; i++)
                {
                    var time = DateTime.UtcNow.ToTimestamp();
                    await channel.Writer.WriteAsync(new StockPrice 
                    {
                        Stock = _stocks.FirstOrDefault(x => x.StockId == stock.StockId),
                        Price = rnd.Next(100, 500).ToString(),
                        DateTimeStamp = time
                    });

                    await Task.Delay(500);
                }
            }
        }

        [AllowAnonymous]
        public override Task<AuthResponse> Authenticate(ClientCred request, ServerCallContext context)
        {
            var token = _jWTAuthenticationManager.Authenticate(request.ClientId, request.ClientSecret);

            if (token == null)
                return null;

            return Task.FromResult(new AuthResponse { BearerToken = token });
        }
    }
}

enter image description here

  • 代码参考取自git

客户端:

  • 下面的代码处理请求流并用单个响应进行响应。
  • 使用
    RequestStream.WriteAsync
    发送请求流并使用
    RequestStream.CompleteAsync
  • 完成该流

public override async Task StreamCommands(IAsyncStreamReader<State> requestStream, IServerStreamWriter<Command> responseStream, ServerCallContext context)
{
    try
    {
        await foreach (var message in requestStream.ReadAllAsync(context.CancellationToken))
        {
            _logger.LogInformation("Client sent state {}", message);

            // Process the received state and send a corresponding command
            var command = // your logic to create a response command based on the received state
            await responseStream.WriteAsync(command);
        }
    }
    catch (OperationCanceledException)
    {
        // Handle cancellation (stream closed by client or other reasons)
        _logger.LogInformation("Stream closed by client or canceled.");
    }
    catch (Exception ex)
    {
        // Handle other exceptions
        _logger.LogError(ex, "Error processing streaming request.");
    }
}

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.