向RX流/ IObservable添加初始值

问题描述 投票:0回答:1

我有一种方法将经过过滤的RX流作为Iobservable返回:

public IObservable<FxDeal> LiveRates(NegotiationDetails negotiation)
{   
    return _ratesObserver.Stream
        .Where(o => o.BaseCurrency == negotiation.Base && o.TermsCurrency == negotiation.Terms)
        .Select(o => GetFxDeal(o, negotiation));
}

问题是,对于某些值,流不会经常更改,因此我需要使用第一个值对其进行初始化

我该怎么做?我读到Subject既可以是观察者,也可以是可观察者。因此,我认为我需要以某种方式将其订阅为Subject,在流中添加第一条消息,然后将其设置为现在的状态。但无法弄清楚该怎么做

任何想法?

c# system.reactive
1个回答
1
投票

您是否尝试过StartWith

public IObservable<FxDeal> LiveRates(NegotiationDetails negotiation)
{   
    var obs = _ratesObserver.Stream
        .Where(o => o.BaseCurrency == negotiation.Base && o.TermsCurrency == negotiation.Terms)
        .Select(o => GetFxDeal(o, negotiation));

    return condition ? obs.StartWith(new FxDeal()) : obs;
}
© www.soinside.com 2019 - 2024. All rights reserved.