The Observable.Repeat不可阻挡,是错误还是功能?

问题描述 投票:0回答:1

当源可观察者的通知是同步的时,我注意到Repeat运算符的行为有些奇怪。结果的observable无法用后续的Repeat运算符停止,并且显然将永远继续运行。为了演示,我创建了一个可观察的源,该源产生一个值,该值在每次订阅时都会递增。第一个订户获得值1,第二个订户获得值2等等:

TakeWhile

然后我将运算符TakeWhileint incrementalValue = 0; var incremental = Observable.Create<int>(async o => { await Task.CompletedTask; //await Task.Yield(); Thread.Sleep(100); var value = Interlocked.Increment(ref incrementalValue); o.OnNext(value); o.OnCompleted(); }); Repeat附加到此可观察对象,以便程序将等待直到所组合的可观察对象产生其最后一个值:

TakeWhile

这是该程序的输出:

LastAsync

它永远不会结束!尽管incremental.Repeat() .Do(new CustomObserver("Checkpoint A")) .TakeWhile(item => item <= 5) .Do(new CustomObserver("Checkpoint B")) .LastAsync() .Do(new CustomObserver("Checkpoint C")) .Wait(); Console.WriteLine($"Done"); class CustomObserver : IObserver<int> { private readonly string _name; public CustomObserver(string name) => _name = name; public void OnNext(int value) => Console.WriteLine($"{_name}: {value}"); public void OnError(Exception ex) => Console.WriteLine($"{_name}: {ex.Message}"); public void OnCompleted() => Console.WriteLine($"{_name}: Completed"); } 已产生其值并完成了操作,但Checkpoint A: 1 Checkpoint B: 1 Checkpoint A: 2 Checkpoint B: 2 Checkpoint A: 3 Checkpoint B: 3 Checkpoint A: 4 Checkpoint B: 4 Checkpoint A: 5 Checkpoint B: 5 Checkpoint A: 6 Checkpoint B: Completed Checkpoint C: 5 Checkpoint C: Completed Checkpoint A: 7 Checkpoint A: 8 Checkpoint A: 9 Checkpoint A: 10 Checkpoint A: 11 Checkpoint A: 12 Checkpoint A: 13 Checkpoint A: 14 Checkpoint A: 15 Checkpoint A: 16 Checkpoint A: 17 ... 操作员仍在旋转!

仅当可观察到的源同步通知其订户时,才会发生这种情况。例如,在取消注释LastAsync行之后,程序将按预期方式运行:

Repeat

//await Task.Yield();运算符停止旋转,尽管它没有报告完成(我猜是已取消订阅)。>>

是否有任何方法可以从Checkpoint A: 1 Checkpoint B: 1 Checkpoint A: 2 Checkpoint B: 2 Checkpoint A: 3 Checkpoint B: 3 Checkpoint A: 4 Checkpoint B: 4 Checkpoint A: 5 Checkpoint B: 5 Checkpoint A: 6 Checkpoint B: Completed Checkpoint C: 5 Checkpoint C: Completed Done 运算符实现一致的行为,而不管其接收的通知类型(同步还是异步)?

。NET Core 3.0,C#8,System.Reactive 4.3.2,控制台应用程序

当源可观察者的通知是同步的时,我注意到Repeat运算符的行为有些奇怪。后续的TakeWhile无法停止产生的可观察对象...

c# system.reactive rx.net
1个回答
0
投票

[您可能希望Repeat的实现具有Repeat通知,但就Repeat而言,它变为OnCompleted-无限流。

© www.soinside.com 2019 - 2024. All rights reserved.