这是出于好奇的教育,出于好奇。考虑以下代码段:
var enumerable = Enumerable.Range(0, 5);
var observable = enumerable.ToObservable();
var enu = observable.Concat(observable).ToEnumerable();
enu.ToObservable().SubscribeDebug();
SubscribeDebug
订阅了一个简单的观察者:
public class DebugObserver<T> : IObserver<T>
{
public void OnCompleted()
{
Debug.WriteLine("Completed");
}
public void OnError(Exception error)
{
Debug.WriteLine("Error");
}
public void OnNext(T value)
{
Debug.WriteLine("Value: {0}", value);
}
}
此输出为:
值:0
值:1
值:2
值:3
值:4
然后阻止。有人可以帮助我了解发生这种情况的根本原因以及可观察到的原因没有完成的原因吗?我注意到,它不需要Concat
调用就可以完成,但是可以阻止。
我看过the source的ToObservable
并提炼出一个最小的实现。它确实重现了我们所看到的行为。
public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable) =>
ToObservableEx(enumerable, CurrentThreadScheduler.Instance);
public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable, IScheduler scheduler) =>
Observable.Create<T>
(
observer =>
{
IDisposable loopRec(IScheduler inner, IEnumerator<T> enumerator)
{
if (enumerator.MoveNext()) //<-- culprit
{
observer.OnNext(enumerator.Current);
inner.Schedule(enumerator, loopRec);
}
else
{
observer.OnCompleted();
}
// ToObservable.cs Line 117
// We never allow the scheduled work to be cancelled.
return Disposable.Empty;
}
return scheduler.Schedule(enumerable.GetEnumerator(), loopRec);
}
);
没有这个问题的症结在于CurrentThreadScheduler
的行为,这是使用的默认调度程序。
CurrentThreadScheduler
的行为是,如果在调用Schedule
时时间表正在已经运行,则最终会排队。
CurrentThreadScheduler.Instance.Schedule(() =>
{
CurrentThreadScheduler.Instance.Schedule(() =>
Console.WriteLine(1)
);
Console.WriteLine(2);
});
此打印2 1
。这种排队行为是我们的撤销。
[调用observer.OnCompleted()
时,它将导致Concat
开始下一个枚举-但是,事情与开始时不同-在尝试安排下一个枚举时,我们仍然在observer => { }
块内。因此,不是立即执行,而是将下一个计划排入队列。
现在enumerator.MoveNext()
陷入死锁。它不能移动到下一个项目-MoveNext
处于阻塞状态,直到下一个项目到达-仅在由ToObservable
循环安排时才能到达。
但是,调度程序只能在退出ToEnumerable
时通知被阻止的MoveNext()
和随后的loopRec
,但由于首先被MoveNext
阻止,因此它无法通知。 >
附录
这大约是ToEnumerable
(来自GetEnumerator.cs)(不是有效的实现):
public static IEnumerable<T> ToEnumerableEx<T>(this IObservable<T> observable) { var gate = new SemaphoreSlim(0); var queue = new ConcurrentQueue<T>(); using(observable.Subscribe( value => { queue.Enqueue(value); gate.Release(); }, () => gate.Release())) while (true) { gate.Wait(); //this is where it blocks if (queue.TryDequeue(out var current)) yield return current; else break; } }
Enumerables在生成下一个项目之前一直处于阻塞状态,这就是为什么要实现门控的原因。不是
Enumerable.Range
会阻止,而是ToEnumerable
。