无功扩展 - 如何进行可变速率轮询?

问题描述 投票:0回答:1

我想知道如何以设定的间隔轮询/调用方法。我希望能够改变这个常规间隔。所以这样的事情。

[Reactive]
public TimeSpan Rate { get; set; }

IObservable<TimeSpan> rate = this.WhenAnyValue(vm => vm.Rate);

// poll should emit at the current rate set in `rate observable`
rate.Poll().InvokeCommand(cmd);

在大理石图

rate         ---4----------2--------∞----------4----------------
Poll         ---X---X---X--X-X-X-X-XX----------X---X---X---X---X

X - the time when method is executed

需要注意的是,rate中的新值应该强制Poll立即发射并取消之前应该发出的任何值。

我试着写一个Poll Operator。然而,当我试图传递一个大的TimeSpan时,它会抱怨。

    public static IObservable<TimeSpan> Poll(this IObservable<TimeSpan> sourceObservable, IScheduler scheduler = null)
    {
        scheduler = scheduler ?? NewThreadScheduler.Default;

        return Observable.Create<TimeSpan>(observer =>
        {
            return scheduler.ScheduleAsync(async (s, ct) =>
            {
                var timerTokenSource = new CancellationTokenSource();
                var compositeTokenSource = CancellationTokenSource.CreateLinkedTokenSource(timerTokenSource.Token, ct);

                TimeSpan interval = TimeSpan.MaxValue;

                sourceObservable.Subscribe(t =>
                    {
                        interval = t;
                        timerTokenSource.Cancel();
                        timerTokenSource = new CancellationTokenSource();
                        compositeTokenSource = CancellationTokenSource.CreateLinkedTokenSource(timerTokenSource.Token, ct);
                    },
                    ct);

                while (!ct.IsCancellationRequested)
                {
                    await s.Sleep(interval, compositeTokenSource.Token);
                    observer.OnNext(interval);
                }
            });
        });
    }
reactive-programming system.reactive reactiveui rx.net
1个回答
4
投票

我想这就是你想要的:

public static IObservable<TimeSpan> Poll(this IObservable<TimeSpan> sourceObservable, IScheduler scheduler = null)
{
    scheduler = scheduler ?? NewThreadScheduler.Default;

    return 
        sourceObservable
            .Select(ts => Observable.Timer(TimeSpan.Zero, ts).Select(x => ts))
            .ObserveOn(scheduler)
            .Switch();
}

我不知道你为什么要回归TimeSpan。这是为什么?

© www.soinside.com 2019 - 2024. All rights reserved.