我有一种方法将经过过滤的RX流作为Iobservable返回:
public IObservable<FxDeal> LiveRates(NegotiationDetails negotiation)
{
return _ratesObserver.Stream
.Where(o => o.BaseCurrency == negotiation.Base && o.TermsCurrency == negotiation.Terms)
.Select(o => GetFxDeal(o, negotiation));
}
问题是,对于某些值,流不会经常更改,因此我需要使用第一个值对其进行初始化
我该怎么做?我读到Subject
既可以是观察者,也可以是可观察者。因此,我认为我需要以某种方式将其订阅为Subject
,在流中添加第一条消息,然后将其设置为现在的状态。但无法弄清楚该怎么做
任何想法?
您是否尝试过StartWith
?
public IObservable<FxDeal> LiveRates(NegotiationDetails negotiation)
{
var obs = _ratesObserver.Stream
.Where(o => o.BaseCurrency == negotiation.Base && o.TermsCurrency == negotiation.Terms)
.Select(o => GetFxDeal(o, negotiation));
return condition ? obs.StartWith(new FxDeal()) : obs;
}