我试图通过GRPC流公开一个observable。我的简化代码如下所示:
public override async Task Feed(Request request, IServerStreamWriter<Response> responseStream, ServerCallContext context)
{
var result = new Result();
try
{
await Observable.ForEachAsync(async value =>
{
await responseStream.WriteAsync(value);
});
}
catch (Exception ex)
{
Log.Info("Session ended:" + ex);
}
}
我收到以下错误:
W0123 14:30:59.709715 Grpc.Core.Internal.ServerStreamingServerCallHandler
2 Exception occured in handler. System.ArgumentException: Der Wert liegt außerhalb des erwarteten Bereichs. bei Grpc.Core.Internal.ServerStreamingServerCallHandler
2.d__4.MoveNext()W0123 14:30:59.732716 Grpc.Core.Server处理RPC时出现异常。 System.InvalidOperationException:由于对象的当前状态,操作无效。 at Grpc.Core.Internal.AsyncCallServer2.SendStatusFromServerAsync(Status status, Metadata trailers, Tuple
2 optionalWrite)at Grpc.Core.Internal.ServerStreamingServerCallHandler`2.d__4.MoveNext()---在抛出异常的前一个位置的批处理跟踪结束---在System.Runtime。 CompilerServices.TaskAwaiter.ThrowForNonSuccess(任务任务),位于Grpc.Core.Server.d__34.MoveNext()的System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)
你会怎么建议处理这个?我想我需要在同一个线程中处理ForEachAsync。
使用gRPC流API,您一次只能写一个项目。如果在上一个完成之前启动另一个WriteAsync()操作,则会出现异常。您还需要在从方法处理程序返回之前完成所有写入操作(在本例中为Feed方法)。一次只允许一次写入的原因是为了确保gRPC的流量控制运行良好。
在您的情况下,Rx API似乎无法确保,因此解决此问题的一种方法是使用中间缓冲区。
这个怎么样?
public override async Task Feed(Request request, IServerStreamWriter<Response> responseStream, ServerCallContext context)
{
var result = new Result();
try
{
await Observable.Scan(Task.CompletedTask, (preceding,value) =>
preceding.ContinueWith(_ => responseStream.WriteAsync(value))
);
}
catch (Exception ex)
{
Log.Info("Session ended:" + ex);
}
}
我还没有测试过,但我认为无论如何都可以。
有两个问题阻止了OP的方法按预期工作,这两个问题源于responseStream.WriteAsync的期望,即一次只能有一个写入挂起。
var eventLoop = new EventLoopScheduler();
var o = myObservable.ObserveOn(eventLoop);
responseStream.WriteAsync<Response>(value).Wait();
public override async Task Feed(Request request, IServerStreamWriter<Response> responseStream, ServerCallContext context)
{
var o = getMyObservable(request);
try
{
var eventLoop = new EventLoopScheduler(); //just for example, use whatever scheduler makes sense for you.
await o.ObserveOn(eventLoop).ForEachAsync<Response>(value =>
{
responseStream.WriteAsync(value).Wait();
},
context.CancellationToken);
}
catch (TaskCanceledException) { }
catch (Exception ex)
{
Log.Info("Session ended:" + ex);
}
Log.Info("Session ended normally");
}