如何使用C#创建gRPC流式拦截器?

问题描述 投票:1回答:1

我已经弄清楚了如何创建单价RPC拦截器,但我无法弄清楚如何制作流式RPC拦截器.这是我到目前为止所做的。

public override async Task<TResponse> ClientStreamingServerHandler<TRequest, TResponse>(
        IAsyncStreamReader<TRequest> requestStream, ServerCallContext context,
        ClientStreamingServerMethod<TRequest, TResponse> continuation)
    {
        Console.WriteLine("ClientStreaming");
        var response = await base.ClientStreamingServerHandler(requestStream, context, continuation);
        return response;
    }

这段代码在每次启动客户端流的时候都会被控制台记录下来 我只是不知道如何将每一条传入的客户端消息都记录下来

尊敬的Jesse

c# .net grpc interceptor grpc-dotnet
1个回答
0
投票

我最近遇到了一个和你类似的情况,下面是我如何用拦截器解决的。 我们需要通过我们的gRPC API来测量消息吞吐量消息大小。 单元调用很直接,但是遇到了你在流媒体方面的问题。 下面是我为了拦截流媒体(我的情况是服务器,你的情况应该类似)所做的。

第一段代码是你已经有的拦截方法(这个是服务器流)。

    public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(TRequest request, 
                                                                                                    ClientInterceptorContext<TRequest, TResponse> context, 
                                                                                                    AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation)
  {
     // Add the outgoing message size to the metrics
     mCollector.Add(mInterceptorName,context.Method.ServiceName, context.Method.Name, NetworkMetricsCollectionService.DIRECTION.OUT, request as IMessage);

     // This call returns the server stream, among other things and the server stream reader
     // is returned for the client to consume.
     var prelimResponse = base.AsyncServerStreamingCall(request, context, continuation);

     // Add the result message size to the metrics
     mCollector.Add(mInterceptorName, context.Method.ServiceName, context.Method.Name, NetworkMetricsCollectionService.DIRECTION.IN, prelimResponse as IMessage);
     // Wrap the response stream object with our implementation that will log the size and then
     // proxy that to the client.

     var response = new AsyncServerStreamingCall<TResponse>(new AsyncStreamReaderWrapper<TResponse>(prelimResponse.ResponseStream,
                                                                                                    mInterceptorName,
                                                                                                    context.Method.ServiceName, 
                                                                                                    context.Method.Name,mCollector), 
                                                                                                    prelimResponse.ResponseHeadersAsync, 
                                                                                                    prelimResponse.GetStatus, 
                                                                                                    prelimResponse.GetTrailers, 
                                                                                                    prelimResponse.Dispose);
     // return the wrapped stream to the client
     return response;
  }

以及AsyncServerStreamReaderWrapper的实现,接收对象,测量,记录它的大小,然后传给客户端。 这个包装器是需要的,因为Stream阅读器只能有一个消费者,如果我有多个阅读器就会产生错误,这是有道理的。

   /// <summary>
   /// Wrapper class around the gRPC AsyncStreamReader class that allows retrieval of the object 
   /// before handing off to the client for the purpose of throughput measurements and metrics
   /// collection
   /// </summary>
   /// <typeparam name="T">type of object contained within the stream</typeparam>
   public class AsyncStreamReaderWrapper<T> : IAsyncStreamReader<T>
   {
      private IAsyncStreamReader<T> mInnerImplementation = null;
      private NetworkMetricsCollectionService mCollector = null;
      private string mId = string.Empty;
      private string mService = string.Empty;
      private string mMethod = string.Empty;

      public T Current => mInnerImplementation.Current;

      /// <summary>
      /// Advances the reader to the next element in the sequence, returning the result asynchronously.
      /// </summary>
      /// <param name="cancellationToken">Cancellation token that can be used to cancel the 
      /// operation.</param>
      /// <returns>Task containing the result of the operation: true if the reader was successfully
      /// advanced to the next element; false if the reader has passed the end of the sequence.</returns>
      public async Task<bool> MoveNext(CancellationToken cancellationToken)
      {
         bool result = await mInnerImplementation.MoveNext(cancellationToken);
         if (result)
         {
            mCollector.Add(mId,mService, mMethod, NetworkMetricsCollectionService.DIRECTION.IN, Current as IMessage);
         }
         return result;
      }

      /// <summary>
      /// Parameterized Constructor
      /// </summary>
      /// <param name="aInnerStream">inner stream reader to wrap</param>
      /// <param name="aService">service name for metrics reporting</param>
      /// <param name="aMethod">method name for metrics reporting</param>
      /// <param name="aCollector">metrics collector</param>
      public AsyncStreamReaderWrapper(IAsyncStreamReader<T> aInnerStream, string aId, string aService, string aMethod, NetworkMetricsCollectionService aCollector)
      {
         mInnerImplementation = aInnerStream;
         mId = aId;
         mService = aService;
         mMethod = aMethod;
         mCollector = aCollector;
      }
   }

我知道这不是你要找的确切场景,但我相信你的实现会类似,但使用StreamWriter而不是StreamReader,并且没有试图测量消息大小的位。

© www.soinside.com 2019 - 2024. All rights reserved.