Rx.NET 缓冲区在取消时发出所有项目

问题描述 投票:0回答:1

我正在以最简单的方式使用 Rx.NET 库及其方法

Buffer
,例如:

observable.Buffer(TimeSpan.FromSeconds(5), 10);

除了激活取消令牌的情况外,它效果很好。当这种情况发生时,我希望

Buffer
发出它当时持有的所有事件,而不是等到计时器滴答作响。这可能吗?

示例: 我在 2 秒内发出了第 1、2、3 项,因此未达到 10 项限制,也未达到 5 秒限制。现在请求取消,我想让所有缓冲的项目在结束请求/进程之前至少“看到”它们,而不需要额外等待 3 秒的计时器。

c# .net system.reactive rx.net
1个回答
0
投票

如果我理解正确的话,我会看到可能的解决方案:

    public static IObservable<Unit> ToObservable(this CancellationToken ct) =>
        Observable.Create<Unit>(observer => ct.Register(() =>
        {
            observer.OnNext(Unit.Default);
            observer.OnCompleted();
        }));

    static void Main()
    {
        var cts = new CancellationTokenSource(6000);

        Observable.Interval(TimeSpan.FromSeconds(0.5))
            .TakeUntil(cts.Token.ToObservable())
            .Buffer(10)
            .Subscribe(b=>
            {
                if(cts.IsCancellationRequested) Console.WriteLine($"Discarded count: {b.Count}");
                else Console.WriteLine($"Processed count: {b.Count}");
            });
        Console.ReadLine();
    }

主要思想是在取消请求时完成 Buffer 的源序列 - 它使 Buffer 立即释放收集的项目并完成其工作,稍后在 Subscribe 上您只需检查是否请求取消并在数据到达时做出适当的决定。

© www.soinside.com 2019 - 2024. All rights reserved.