当源可观察者的通知是同步的时,我注意到Repeat
运算符的行为有些奇怪。结果的observable无法用后续的Repeat
运算符停止,并且显然将永远继续运行。为了演示,我创建了一个可观察的源,该源产生一个值,该值在每次订阅时都会递增。第一个订户获得值1,第二个订户获得值2等等:
TakeWhile
然后我将运算符TakeWhile
,int incrementalValue = 0;
var incremental = Observable.Create<int>(async o =>
{
await Task.CompletedTask;
//await Task.Yield();
Thread.Sleep(100);
var value = Interlocked.Increment(ref incrementalValue);
o.OnNext(value);
o.OnCompleted();
});
和Repeat
附加到此可观察对象,以便程序将等待直到所组合的可观察对象产生其最后一个值:
TakeWhile
这是该程序的输出:
LastAsync
它永远不会结束!尽管incremental.Repeat()
.Do(new CustomObserver("Checkpoint A"))
.TakeWhile(item => item <= 5)
.Do(new CustomObserver("Checkpoint B"))
.LastAsync()
.Do(new CustomObserver("Checkpoint C"))
.Wait();
Console.WriteLine($"Done");
class CustomObserver : IObserver<int>
{
private readonly string _name;
public CustomObserver(string name) => _name = name;
public void OnNext(int value) => Console.WriteLine($"{_name}: {value}");
public void OnError(Exception ex) => Console.WriteLine($"{_name}: {ex.Message}");
public void OnCompleted() => Console.WriteLine($"{_name}: Completed");
}
已产生其值并完成了操作,但Checkpoint A: 1
Checkpoint B: 1
Checkpoint A: 2
Checkpoint B: 2
Checkpoint A: 3
Checkpoint B: 3
Checkpoint A: 4
Checkpoint B: 4
Checkpoint A: 5
Checkpoint B: 5
Checkpoint A: 6
Checkpoint B: Completed
Checkpoint C: 5
Checkpoint C: Completed
Checkpoint A: 7
Checkpoint A: 8
Checkpoint A: 9
Checkpoint A: 10
Checkpoint A: 11
Checkpoint A: 12
Checkpoint A: 13
Checkpoint A: 14
Checkpoint A: 15
Checkpoint A: 16
Checkpoint A: 17
...
操作员仍在旋转!
仅当可观察到的源同步通知其订户时,才会发生这种情况。例如,在取消注释LastAsync
行之后,程序将按预期方式运行:
Repeat
//await Task.Yield();
运算符停止旋转,尽管它没有报告完成(我猜是已取消订阅)。>>
是否有任何方法可以从Checkpoint A: 1
Checkpoint B: 1
Checkpoint A: 2
Checkpoint B: 2
Checkpoint A: 3
Checkpoint B: 3
Checkpoint A: 4
Checkpoint B: 4
Checkpoint A: 5
Checkpoint B: 5
Checkpoint A: 6
Checkpoint B: Completed
Checkpoint C: 5
Checkpoint C: Completed
Done
运算符实现一致的行为,而不管其接收的通知类型(同步还是异步)?
。NET Core 3.0,C#8,System.Reactive 4.3.2,控制台应用程序
当源可观察者的通知是同步的时,我注意到Repeat运算符的行为有些奇怪。后续的TakeWhile无法停止产生的可观察对象...
[您可能希望Repeat
的实现具有Repeat
通知,但就Repeat
而言,它变为OnCompleted
-无限流。