我有一个简单的订阅,是在MyClass中创建的。
//myService.Connect returns IObservable<MyData>
myService.Connect(requestParameters)
.Subscribe(DoSomething);
在MyClass中,有一个我正在监听的属性,它被用来创建一个新的属性 请求参数. 我的目标是每当该属性发生变化时,就使用新的requestParameters再次调用myService.Connect,并取消之前的订阅。
有没有一种直接的方法可以用RX来实现这个目标? 我曾看过Switch,但那是在订阅一个发出Observable的Observable时使用的。 我也看了一下TakeUntil,我可以保持订阅的活力,直到属性变化,但我不知道如何触发自动重新订阅。
在一天结束时,我可以在属性改变时自己调用myService.Connect,但我想看看是否有一些现有的RX功能可以代替。
根据你想做的事情,你可以用以下方法来实现 开关操作员 可能确实是你要找的东西。它可以用来向公众提供一个简单的观测值,底层的源观测值可以 "切换 "到不同的观测值。只要类型相同,它就可以工作。事实上, Switch()
操作符要求观测值必须是相同的类型。请看上面文档中的下图。
大理石图
Switch
经营者从 http:/reactivex.io
上面的部分显示的是处于活动状态的观测值,并向底部的 "可见 "观测值发送值,但在某些时候,原来的源被改变或 "切换"(圆圈到三角形),而底部的 "可见 "观测值则像什么都没发生一样。但是在某些时候,原始源被改变或 "切换"(圆圈到三角形),而底部的 "可见 "观测值却像什么都没发生过一样产生值。
在您的案例中,您的观测值与您的 myService.Connect(requestParameters)
. 你把这个值填到一个主题的 IObservable<YourType>
经由 OnNext()
方法。检查下面的例子。
ISubject<IObservable<int>> sourceOfObservables = new Subject<IObservable<int>>();
IObservable<int> publicSource = sourceOfObservables.Switch();
IDisposable testSubscription = publicSource.Subscribe(it => {
Console.WriteLine("Value received: "+it);
});
sourceOfObservables.OnNext(Observable.Range(1, 5));
sourceOfObservables.OnNext(Observable.Range(10, 5));
sourceOfObservables.OnNext(Observable.Range(100, 5));
这将产生以下输出。
Value received: 10
Value received: 11
Value received: 12
Value received: 13
Value received: 14
Value received: 100
Value received: 101
Value received: 102
Value received: 103
Value received: 104
正如你所看到的,你只有一个活动的订阅。testSubscription
在这里你可以读到15个不同的值。然而,他们来自三个独立的来源observables。这类似于 Concat
在这里,你可以连接观测值,但在这个时候,你有点 "热交换 "一个活跃的订阅和另一个,而不需要每个人都重新订阅到你的新obserable。
你可以通过首先定义一个新的观测点来解决这个问题。ISubject<IObservable<YourType>>
. 然后当你的 requestParameters
变化,你推一个新的 IObservable<YourType>
例子 myService.Connect()
方法到你的 ISubject<IObservable<YourType>>
实例与 OnNext()
方法(如示例中所示)。从现在开始,接下来的值来自于 该 可观察。
奖金。
当... requestParameters
已经来自一个可观察的对象,你可以使用 Select()
经营者 System.Reactive.Linq
,而不是普通的LINQ)来建立的。IObservable<YourType>
自动,你可以 "平铺直叙"、"伪连词 "或用文字 "切换"。Switch()
:
IObservable<string> requestParametersChangeStream;
requestParametersChangeStream // produces "string" objects
.Select(it => myService.Connect(it)) // produces "IObservable<YourType>" objects
.Switch() // make it so it looks like
// an IObservable<YourType>
.Subscribe(it => DoWhateverYouWant(it));