我正在构建一个消息处理管道,并注意到当最后一个观察者处理订阅时,observable仍然在通过数据。
我查看了Rx文档,我的基于它的假设是,根据文档,'RefCount()'会在最后一个观察者取消订阅后断开观察点:
然后,RefCount会跟踪有多少其他观察者订阅它,并且在最后一个观察者完成之前不会断开与底层可连接Observable的连接。
为了说明这个问题,我在下面创建了一个非常简约的例子:
class Program
{
static void Main(string[] args)
{
_ = SimulateObservableIssue();
Console.ReadKey();
}
public static async Task SimulateObservableIssue()
{
IObservable<int> source = Observable.Create<int>(async (observer) =>
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine($"Source publishing {i}");
observer.OnNext(i);
await Task.Delay(1000);
}
observer.OnCompleted();
return Disposable.Create(() => Console.WriteLine("Observable is disposed"));
});
var multiSource = source.Publish().RefCount();
var subscription = multiSource.Subscribe(x => Console.WriteLine("Observer received: " + x));
await Task.Delay(3000);
subscription.Dispose();
Console.WriteLine("Subscription disposed");
}
}
为什么在'subscription.Dispose()之后,observable仍在尝试生成数据?
谢谢!
你的source
观察不尊重你提到的Observable合约。如果你用这个替换source
:
var source = Observable.Interval(TimeSpan.FromSeconds(1))
.Do(i => Console.WriteLine($"Source publishing {i}"), () => Console.WriteLine("Observable is disposed"))
.Take(10);
......你会发现它按预期工作。
至于为什么,想想一个有两个阶段的观察:订阅和观察。无论订阅取消如何,订阅时间内发生的代码总是会发生。 Observable.Create
代码是所有订阅代码。
我写的可观察的是所有观察代码(因为大多数可观察代码应该是)。因此它适当地响应订阅取消。