RefCount()中的Observable不会停止发布

问题描述 投票:0回答:1

我正在构建一个消息处理管道,并注意到当最后一个观察者处理订阅时,observable仍然在通过数据。

我查看了Rx文档,我的基于它的假设是,根据文档,'RefCount()'会在最后一个观察者取消订阅后断开观察点:

然后,RefCount会跟踪有多少其他观察者订阅它,并且在最后一个观察者完成之前不会断开与底层可连接Observable的连接。

为了说明这个问题,我在下面创建了一个非常简约的例子:

class Program
{
    static void Main(string[] args)
    {
        _ = SimulateObservableIssue();

        Console.ReadKey();
    }

    public static async Task SimulateObservableIssue()
    {
        IObservable<int> source = Observable.Create<int>(async (observer) =>
        {
            for (int i = 0; i < 10; i++)
            {
                Console.WriteLine($"Source publishing {i}");
                observer.OnNext(i);
                await Task.Delay(1000);
            }

            observer.OnCompleted();

            return Disposable.Create(() => Console.WriteLine("Observable is disposed"));
        });

        var multiSource = source.Publish().RefCount();

        var subscription = multiSource.Subscribe(x => Console.WriteLine("Observer received: " + x));

        await Task.Delay(3000);

        subscription.Dispose();

        Console.WriteLine("Subscription disposed");

    }
}

and the console output

为什么在'subscription.Dispose()之后,observable仍在尝试生成数据?

谢谢!

c# .net rx.net
1个回答
0
投票

你的source观察不尊重你提到的Observable合约。如果你用这个替换source

    var source = Observable.Interval(TimeSpan.FromSeconds(1))
        .Do(i => Console.WriteLine($"Source publishing {i}"), () => Console.WriteLine("Observable is disposed"))
        .Take(10);

......你会发现它按预期工作。

至于为什么,想想一个有两个阶段的观察:订阅和观察。无论订阅取消如何,订阅时间内发生的代码总是会发生。 Observable.Create代码是所有订阅代码。

我写的可观察的是所有观察代码(因为大多数可观察代码应该是)。因此它适当地响应订阅取消。

© www.soinside.com 2019 - 2024. All rights reserved.