Rx的连接状态和错误元数据最佳实践

问题描述 投票:0回答:1

系统

我正在构建服务器端流计算引擎,以对金融服务公司的市场价格进行实时计算。

这个想法是,计算服务器获得与各种市场数据源(主要是彭博社)的连接,并在新输入(价格,外汇汇率等)到来时重新计算各种相对复杂的计算。

然后,这些计算结果将被推送到使用SignalR连接到服务器的各种客户端(WPF桌面)。在客户端和服务器上,我将使用Rx和IObservable来实现用于计算的管道并从中级联下游活动。

问题

事实是,计算的有效状态可能是0.3995,但可能是“等待价格”,甚至是“服务器停机”。

我不想使用内置的Rx错误系统,因为它看起来很脆弱。一旦遇到任何异常,IObservable将永远终止,必须重新订阅才能恢复。我希望我的流是健壮和持久的,并且期望可能的可恢复错误状态是数据而不是异常。

我目前的想法

我想将返回数据以一种monad的形式包装在流中,就像这样:

有人有更好的建议吗?或一些可以帮助我正确解决问题的指针?

    public enum TransientState 
    {
        NotReady = 0,
        Ok = 1,
        Faulted = 2,
        Fatal = 3
    }

    public struct RxMessage<T> 
    {
        public TransientState State { get; set; }
        public DateTime TimestampUtc { get; set; }
        public string Fault { get; set; }
        public T Payload { get; set; }
    }
c# .net design-patterns signalr system.reactive
1个回答
0
投票

您的方法有意义,并且肯定会起作用。

我可能希望使用单独的状态流。使用单独的流,您将不再继续推送相同的状态数据,并且客户端可以将对新数据和状态更改的处理分开。

例如:

var statusStream = new Subject<TransientState>();
var tickStream = new Subject<float>();

var finalStream = statusStream
    .Select(s => s == TransientState.Ok ? tickStream : Observable.Never<float>() )
    .Switch();

finalStream.Dump(); //linqpad

statusStream.OnNext(TransientState.NotReady);
tickStream.OnNext(0);
statusStream.OnNext(TransientState.Ok);
tickStream.OnNext(1);
tickStream.OnNext(2);
tickStream.OnNext(3);
statusStream.OnNext(TransientState.Faulted);
tickStream.OnNext(4);
tickStream.OnNext(5);
statusStream.OnNext(TransientState.Ok);
tickStream.OnNext(6);

输出为1 2 3 6,只有状态为OK时才出现的数字。

© www.soinside.com 2019 - 2024. All rights reserved.