当一个属性发生变化时,重新启动一个可观察的订阅。

问题描述 投票:3回答:1

我有一个简单的订阅,是在MyClass中创建的。

//myService.Connect returns IObservable<MyData>
myService.Connect(requestParameters)
         .Subscribe(DoSomething);

在MyClass中,有一个我正在监听的属性,它被用来创建一个新的属性 请求参数. 我的目标是每当该属性发生变化时,就使用新的requestParameters再次调用myService.Connect,并取消之前的订阅。

有没有一种直接的方法可以用RX来实现这个目标? 我曾看过Switch,但那是在订阅一个发出Observable的Observable时使用的。 我也看了一下TakeUntil,我可以保持订阅的活力,直到属性变化,但我不知道如何触发自动重新订阅。

在一天结束时,我可以在属性改变时自己调用myService.Connect,但我想看看是否有一些现有的RX功能可以代替。

c# system.reactive
1个回答
2
投票

根据你想做的事情,你可以用以下方法来实现 开关操作员 可能确实是你要找的东西。它可以用来向公众提供一个简单的观测值,底层的源观测值可以 "切换 "到不同的观测值。只要类型相同,它就可以工作。事实上, Switch() 操作符要求观测值必须是相同的类型。请看上面文档中的下图。

Marble diagram of the Switch operator 大理石图 Switch 经营者从 http:/reactivex.io

上面的部分显示的是处于活动状态的观测值,并向底部的 "可见 "观测值发送值,但在某些时候,原来的源被改变或 "切换"(圆圈到三角形),而底部的 "可见 "观测值则像什么都没发生一样。但是在某些时候,原始源被改变或 "切换"(圆圈到三角形),而底部的 "可见 "观测值却像什么都没发生过一样产生值。

在您的案例中,您的观测值与您的 myService.Connect(requestParameters). 你把这个值填到一个主题的 IObservable<YourType> 经由 OnNext() 方法。检查下面的例子。

ISubject<IObservable<int>> sourceOfObservables = new Subject<IObservable<int>>();

IObservable<int> publicSource = sourceOfObservables.Switch();

IDisposable testSubscription = publicSource.Subscribe(it => {
    Console.WriteLine("Value received: "+it);
});

sourceOfObservables.OnNext(Observable.Range(1, 5));
sourceOfObservables.OnNext(Observable.Range(10, 5));
sourceOfObservables.OnNext(Observable.Range(100, 5));

这将产生以下输出。

Value received: 10
Value received: 11
Value received: 12
Value received: 13
Value received: 14
Value received: 100
Value received: 101
Value received: 102
Value received: 103
Value received: 104

正如你所看到的,你只有一个活动的订阅。testSubscription 在这里你可以读到15个不同的值。然而,他们来自三个独立的来源observables。这类似于 Concat 在这里,你可以连接观测值,但在这个时候,你有点 "热交换 "一个活跃的订阅和另一个,而不需要每个人都重新订阅到你的新obserable。

你可以通过首先定义一个新的观测点来解决这个问题。ISubject<IObservable<YourType>>. 然后当你的 requestParameters 变化,你推一个新的 IObservable<YourType> 例子 myService.Connect() 方法到你的 ISubject<IObservable<YourType>> 实例与 OnNext() 方法(如示例中所示)。从现在开始,接下来的值来自于 可观察。

奖金。

当... requestParameters 已经来自一个可观察的对象,你可以使用 Select() 经营者 System.Reactive.Linq,而不是普通的LINQ)来建立的。IObservable<YourType> 自动,你可以 "平铺直叙"、"伪连词 "或用文字 "切换"。Switch():

IObservable<string> requestParametersChangeStream;

requestParametersChangeStream             // produces "string" objects 
    .Select(it => myService.Connect(it))  // produces "IObservable<YourType>" objects
    .Switch()                             // make it so it looks like
                                          // an IObservable<YourType>
    .Subscribe(it => DoWhateverYouWant(it));
© www.soinside.com 2019 - 2024. All rights reserved.