我正在以最简单的方式使用 Rx.NET 库及其方法
Buffer
,例如:
observable.Buffer(TimeSpan.FromSeconds(5), 10);
除了激活取消令牌的情况外,它效果很好。当这种情况发生时,我希望
Buffer
发出它当时持有的所有事件,而不是等到计时器滴答作响。这可能吗?
示例: 我在 2 秒内发出了第 1、2、3 项,因此未达到 10 项限制,也未达到 5 秒限制。现在请求取消,我想让所有缓冲的项目在结束请求/进程之前至少“看到”它们,而不需要额外等待 3 秒的计时器。
如果我理解正确的话,我会看到可能的解决方案:
public static IObservable<Unit> ToObservable(this CancellationToken ct) =>
Observable.Create<Unit>(observer => ct.Register(() =>
{
observer.OnNext(Unit.Default);
observer.OnCompleted();
}));
static void Main()
{
var cts = new CancellationTokenSource(6000);
Observable.Interval(TimeSpan.FromSeconds(0.5))
.TakeUntil(cts.Token.ToObservable())
.Buffer(10)
.Subscribe(b=>
{
if(cts.IsCancellationRequested) Console.WriteLine($"Discarded count: {b.Count}");
else Console.WriteLine($"Processed count: {b.Count}");
});
Console.ReadLine();
}
主要思想是在取消请求时完成 Buffer 的源序列 - 它使 Buffer 立即释放收集的项目并完成其工作,稍后在 Subscribe 上您只需检查是否请求取消并在数据到达时做出适当的决定。