我已经弄清楚了如何创建单价RPC拦截器,但我无法弄清楚如何制作流式RPC拦截器.这是我到目前为止所做的。
public override async Task<TResponse> ClientStreamingServerHandler<TRequest, TResponse>(
IAsyncStreamReader<TRequest> requestStream, ServerCallContext context,
ClientStreamingServerMethod<TRequest, TResponse> continuation)
{
Console.WriteLine("ClientStreaming");
var response = await base.ClientStreamingServerHandler(requestStream, context, continuation);
return response;
}
这段代码在每次启动客户端流的时候都会被控制台记录下来 我只是不知道如何将每一条传入的客户端消息都记录下来
尊敬的Jesse
我最近遇到了一个和你类似的情况,下面是我如何用拦截器解决的。 我们需要通过我们的gRPC API来测量消息吞吐量消息大小。 单元调用很直接,但是遇到了你在流媒体方面的问题。 下面是我为了拦截流媒体(我的情况是服务器,你的情况应该类似)所做的。
第一段代码是你已经有的拦截方法(这个是服务器流)。
public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(TRequest request,
ClientInterceptorContext<TRequest, TResponse> context,
AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation)
{
// Add the outgoing message size to the metrics
mCollector.Add(mInterceptorName,context.Method.ServiceName, context.Method.Name, NetworkMetricsCollectionService.DIRECTION.OUT, request as IMessage);
// This call returns the server stream, among other things and the server stream reader
// is returned for the client to consume.
var prelimResponse = base.AsyncServerStreamingCall(request, context, continuation);
// Add the result message size to the metrics
mCollector.Add(mInterceptorName, context.Method.ServiceName, context.Method.Name, NetworkMetricsCollectionService.DIRECTION.IN, prelimResponse as IMessage);
// Wrap the response stream object with our implementation that will log the size and then
// proxy that to the client.
var response = new AsyncServerStreamingCall<TResponse>(new AsyncStreamReaderWrapper<TResponse>(prelimResponse.ResponseStream,
mInterceptorName,
context.Method.ServiceName,
context.Method.Name,mCollector),
prelimResponse.ResponseHeadersAsync,
prelimResponse.GetStatus,
prelimResponse.GetTrailers,
prelimResponse.Dispose);
// return the wrapped stream to the client
return response;
}
以及AsyncServerStreamReaderWrapper的实现,接收对象,测量,记录它的大小,然后传给客户端。 这个包装器是需要的,因为Stream阅读器只能有一个消费者,如果我有多个阅读器就会产生错误,这是有道理的。
/// <summary>
/// Wrapper class around the gRPC AsyncStreamReader class that allows retrieval of the object
/// before handing off to the client for the purpose of throughput measurements and metrics
/// collection
/// </summary>
/// <typeparam name="T">type of object contained within the stream</typeparam>
public class AsyncStreamReaderWrapper<T> : IAsyncStreamReader<T>
{
private IAsyncStreamReader<T> mInnerImplementation = null;
private NetworkMetricsCollectionService mCollector = null;
private string mId = string.Empty;
private string mService = string.Empty;
private string mMethod = string.Empty;
public T Current => mInnerImplementation.Current;
/// <summary>
/// Advances the reader to the next element in the sequence, returning the result asynchronously.
/// </summary>
/// <param name="cancellationToken">Cancellation token that can be used to cancel the
/// operation.</param>
/// <returns>Task containing the result of the operation: true if the reader was successfully
/// advanced to the next element; false if the reader has passed the end of the sequence.</returns>
public async Task<bool> MoveNext(CancellationToken cancellationToken)
{
bool result = await mInnerImplementation.MoveNext(cancellationToken);
if (result)
{
mCollector.Add(mId,mService, mMethod, NetworkMetricsCollectionService.DIRECTION.IN, Current as IMessage);
}
return result;
}
/// <summary>
/// Parameterized Constructor
/// </summary>
/// <param name="aInnerStream">inner stream reader to wrap</param>
/// <param name="aService">service name for metrics reporting</param>
/// <param name="aMethod">method name for metrics reporting</param>
/// <param name="aCollector">metrics collector</param>
public AsyncStreamReaderWrapper(IAsyncStreamReader<T> aInnerStream, string aId, string aService, string aMethod, NetworkMetricsCollectionService aCollector)
{
mInnerImplementation = aInnerStream;
mId = aId;
mService = aService;
mMethod = aMethod;
mCollector = aCollector;
}
}
我知道这不是你要找的确切场景,但我相信你的实现会类似,但使用StreamWriter而不是StreamReader,并且没有试图测量消息大小的位。