如何在 RXNet (C#) 中实现可观察流的 .Debounce() / .DebounceDistinct() 扩展方法?

问题描述 投票:0回答:1

基本上就是主题所说的。我找不到任何现成的 .Debounce() 扩展方法(类似于 Throttle() 开箱即用的方式)。

下面是一个关于如何实现它的粗略且可能错误的想法(基于我对 rxnet 心态的理解)。

我已经意识到以下实现很可能不是线程安全的,并且它还混合了 debounce 和distinct,这并不理想。需要一些更好的想法来使其稳健:

        /// <summary>
        /// Emits an element from the source observable sequence if and only if there are no other elements emitted during a given period of time specified by debounceTime.
        /// </summary>
        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
        /// <param name="source">Source sequence to debounce.</param>
        /// <param name="debounceTime">Debouncing duration for each element.</param>
        /// <param name="customComparer">The customComparer to use to compare each next element with the previous one</param>
        /// <returns>The debounced sequence.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
        /// <exception cref="ArgumentOutOfRangeException"><paramref name="debounceTime"/> is less than TimeSpan.Zero.</exception>
        /// <remarks>
        /// <para>
        /// This operator debounces the source sequence by holding on to each element for the duration specified in <paramref name="debounceTime"/>. If another
        /// element is produced within this time window, the previous element is dropped and a new timer is started for the current element to try and debounce it,
        /// thus restarting this whole process. For streams that never have gaps larger than or equal to <paramref name="debounceTime"/> between elements, the resulting
        /// stream won't produce any elements at all. In order to reduce the volume of a stream whilst guaranteeing the periodic production of elements, consider using the
        /// Observable.Sample set of operators.
        /// </para>
        /// </remarks>
        static public IObservable<TSource> DebounceX<TSource>(this IObservable<TSource> source, TimeSpan debounceTime, IEqualityComparer<TSource> customComparer = null)
        {
            if (source == null) throw new ArgumentNullException(nameof(source));
            if (debounceTime.TotalMilliseconds <= 0) throw new ArgumentOutOfRangeException(nameof(debounceTime));

            var debouncedElementsStream = Observable.Create<TSource>(Subscribe);

            return debouncedElementsStream;
            
            IDisposable Subscribe(IObserver<TSource> observer)
            {
                //substream of stable elements
                var timer = (Timer)null;
                var previousElement = default(TSource);

                var comparer = customComparer ?? EqualityComparer<TSource>.Default;

                return source.Subscribe(onNext: OnEachElementEmission_, onError: OnError_, onCompleted: OnCompleted_);

                void OnEachElementEmission_(TSource value)
                {
                    if (timer != null && comparer.Equals(previousElement, value)) return;

                    previousElement = value;

                    timer?.Dispose();
                    timer = new Timer(
                        state: null,
                        period: Timeout.InfiniteTimeSpan, //  we only want the timer to fire once
                        dueTime: debounceTime, //             after the debounce time has passed
                        callback: _ => OnElementHasRemainedStableForTheSpecificPeriod(value)
                    );
                }

                void OnError_(Exception exception)
                {
                    timer?.Dispose();
                    timer = null;

                    observer.OnError(exception);
                }

                void OnCompleted_()
                {
                    timer?.Dispose();
                    timer = null;

                    observer.OnCompleted();
                }

                void OnElementHasRemainedStableForTheSpecificPeriod(TSource value)
                {
                    timer?.Dispose();
                    timer = null;

                    observer.OnNext(value); //00

                    //00  OnNext(value) essentially emits in the stream the debounced element that has remained stable for the whole period of time specified by debounceTime
                }
            }
        }
c# .net reactive-programming system.reactive rx.net
1个回答
1
投票

请检查这个示例,它是去抖的而不检查重复项:

static async Task Main()
{
    IObservable<T> Debounce<T>(IObservable<T> source, TimeSpan delay)
    {
        return source
        .Do(x => TimestampedPrint($"Source: {x}"))
        .Select(x => Observable.Return(x).Delay(delay))
        .Switch();
    }
    void TimestampedPrint(object o) => Console.WriteLine($"{DateTime.Now:HH.mm.ss.fff}: {o}");

    Subject<int> subject = new Subject<int>();

    Debounce(subject, TimeSpan.FromSeconds(1))
    .Subscribe(x => TimestampedPrint($"Result: {x}"));

    subject.OnNext(1);
    Thread.Sleep(1100);
    subject.OnNext(2);
    Thread.Sleep(1100);
    subject.OnNext(3);
    Thread.Sleep(500);
    subject.OnNext(4);
    Thread.Sleep(500);
    subject.OnNext(5);

    Console.ReadLine();

}

结果:

23.24.58.633: Started
23.24.58.676: Source: 1
23.24.59.708: Result: 1
23.24.59.783: Source: 2
23.25.00.785: Result: 2
23.25.00.884: Source: 3
23.25.01.386: Source: 4
23.25.01.888: Source: 5
23.25.02.888: Result: 5

带有相等比较器的扩展,如果我的想法正确的话:

    IObservable<T> Debounce<T>(IObservable<T> source, TimeSpan delay, IEqualityComparer<T> comparer = null)
    {
        return source
        .Do(x => TimestampedPrint($"Source: {x}"))
        .DistinctUntilChanged(comparer ?? EqualityComparer<T>.Default)
        .Select(x => Observable.Return(x).Delay(delay))
        .Switch();
    }

当仅发布“1”时,这种情况给我以下输出:

23.26.30.288: Started
23.26.30.346: Source: 1
23.26.31.376: Result: 1
23.26.31.455: Source: 1
23.26.32.557: Source: 1
23.26.33.057: Source: 1
23.26.33.558: Source: 1
© www.soinside.com 2019 - 2024. All rights reserved.