是否有一个Rx.Net运算符告诉已订阅观察者的数量?

问题描述 投票:0回答:1

如果要正确地将观察者分配给长期运行的服务(暴露给IObservable的服务,我会尝试调试。我想知道是否有一个运算符或我们可以创建的东西来记录活动观察者的数量,例如。

public class NewsService
{
    IObservable<Article> GetArticles();
} 


NewsService.Instance
    .GetArticles()
    .DoCount(x=> Trace.Writeline("The current count is {x}"))
    .Subscribe();

[有一个建议的解决方案here,适用于此主题。如果我们无权访问主题并且图书馆公开了IObservable,该怎么办。

system.reactive
1个回答
0
投票

通常来说,没有定义任意可观察序列的用户计数的概念。

对于诸如Observable.Interval之类的冷可观察对象,每次您订阅该可观察对象时,都会创建一个新的管道实例,从它的角度来看,它一次只能看到一个观察者。

尽管如此,我们可以预热可观察到的现象,并观看订阅的来来去去。

    public static IObservable<T> RefCount<T>(this IObservable<T> source, Action<int> onChange)
    {
        var subscribers = 0;
        var shared = source.Publish().RefCount();
        void callback(int count) => onChange(Interlocked.Add(ref subscribers, count));

        return Observable.Create<T>(observer =>
        {
            callback(+1);
            var subscription = shared.Subscribe(observer);
            var dispose = Disposable.Create(() => callback(-1));

            return new CompositeDisposable(subscription, dispose);
        });
    }

Demo

        var values = 
            Observable
            .Interval(TimeSpan.FromSeconds(0.1))
            .RefCount(count => Console.WriteLine($"Subscribers: {count}"));

        values.Take(5).Subscribe();
        values.Take(10).Subscribe();
        values.Take(15).Subscribe();

输出

Subscribers: 1
Subscribers: 2
Subscribers: 3
Subscribers: 2
Subscribers: 1
Subscribers: 0

现在,这行得通,因为我们对可观察的父级有一个共享的视图。因此,请尝试使所有订阅都指向同一实例。

_articles = GetArticles().RefCount(count => Console.WriteLine($"Subscribers: {count}")));
...
_articles.Subscribe();
© www.soinside.com 2019 - 2024. All rights reserved.