如何在 RXNet (C#) 中实现可观察流的 .Debounce() 扩展方法?

问题描述 投票:0回答:1

基本上就是主题所说的。我找不到任何现成的 .Debounce() 扩展方法(类似于 Throttle() 开箱即用的方式)。

我提供了我在答案中提出的 .Debounce() 实现,但我不能 100% 确定它的质量足够高 - 欢迎任何同行评审。

c# .net reactive-programming system.reactive rxnet
1个回答
0
投票

这是我想出的一个粗略的实现 - 尽管考虑到我正在生成一个子可观察值,但我感觉它可以改进,这似乎不是最佳实践:

    /// <summary>
    /// Emits an element from the source observable sequence if and only if there are no other elements emitted during a given period of time specified by debounceTime.
    /// </summary>
    /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
    /// <param name="source">Source sequence to debounce.</param>
    /// <param name="debounceTime">Debouncing duration for each element.</param>
    /// <param name="customComparer">The customComparer to use to compare each next element with the previous one</param>
    /// <returns>The debounced sequence.</returns>
    /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
    /// <exception cref="ArgumentOutOfRangeException"><paramref name="debounceTime"/> is less than TimeSpan.Zero.</exception>
    /// <remarks>
    /// <para>
    /// This operator debounces the source sequence by holding on to each element for the duration specified in <paramref name="debounceTime"/>. If another
    /// element is produced within this time window, the previous element is dropped and a new timer is started for the current element to try and debounce it,
    /// thus restarting this whole process. For streams that never have gaps larger than or equal to <paramref name="debounceTime"/> between elements, the resulting
    /// stream won't produce any elements at all. In order to reduce the volume of a stream whilst guaranteeing the periodic production of elements, consider using the
    /// Observable.Sample set of operators.
    /// </para>
    /// </remarks>
    static public IObservable<TSource> Debounce<TSource>(this IObservable<TSource> source, TimeSpan debounceTime, IEqualityComparer<TSource> customComparer = null)
    {
        if (source == null) throw new ArgumentNullException(nameof(source));
        if (debounceTime.TotalMilliseconds <= 0) throw new ArgumentOutOfRangeException(nameof(debounceTime));

        var debouncedElementsStream = Observable.Create<TSource>(Subscribe);

        return debouncedElementsStream;
        
        IDisposable Subscribe(IObserver<TSource> observer)
        {
            //substream of stable elements
            var timer = (Timer)null;
            var previousElement = default(TSource);

            var comparer = customComparer ?? EqualityComparer<TSource>.Default;

            return source.Subscribe(onNext: OnEachElementEmission_, onError: OnError_, onCompleted: OnCompleted_);

            void OnEachElementEmission_(TSource value)
            {
                if (timer != null && comparer.Equals(previousElement, value)) return;

                previousElement = value;

                timer?.Dispose();
                timer = new Timer(
                    state: null,
                    period: debounceTime,
                    dueTime: TimeSpan.Zero,
                    callback: _ => OnElementHasRemainedStableForTheSpecificPeriod(value)
                );
            }

            void OnError_(Exception exception)
            {
                timer?.Dispose();
                timer = null;

                observer.OnError(exception);
            }

            void OnCompleted_()
            {
                timer?.Dispose();
                timer = null;

                observer.OnCompleted();
            }

            void OnElementHasRemainedStableForTheSpecificPeriod(TSource value)
            {
                timer?.Dispose();
                timer = null;

                observer.OnNext(value); //00

                //00  OnNext(value) essentially emits in the stream the debounced element that has remained stable for the whole period of time specified by debounceTime
            }
        }
    }
© www.soinside.com 2019 - 2024. All rights reserved.