我创建了以下扩展,我可以在其中使
IObservable.Subscribe
可等待,但包装整个订阅。
当前 C# 异步编程和下面的代码中的一个大问题是不可等待的
onNext
、onError
、onCompleted
。
有没有办法解决并拥有异步/等待代码?我错过的任何扩展方法;
public static Task Observe<T>(this IObservable<T> observable,
Action<T> onNext,
Func<Exception, EventWaitHandle, ExceptionDispatchInfo> onError,
Action<EventWaitHandle, CancellationToken> onCompleted,
CancellationToken cancellationToken
) where T : class
{
var compositeDisposables = new CompositeDisposable();
var waitHandle = new ManualResetEvent(false);
compositeDisposables.Add(waitHandle);
var disposable = observable.Subscribe(
// This should be await onNext
onNext,
// This should be await onError
e => onError(e, waitHandle),
// This should be await onCompleted
() => onCompleted(waitHandle, cancellationToken));
compositeDisposables.Add(disposable);
waitHandle.WaitOne();
compositeDisposables.Dispose();
return Task.CompletedTask;
}
我知道有关于 onNext 异步的解决方案,但不涵盖 onError 和 onCompleted。
您已经可以等待可观察的了。考虑这段代码:
async Task Main()
{
IObservable<int> observable = Observable.Range(0, 10);
int value = await observable;
Console.WriteLine(value);
}
这会在控制台上产生
9
值。可观察到的最后一个值。
如果你想要第一个,只需这样做:
async Task Main()
{
IObservable<int> observable = Observable.Range(0, 10);
int value = await observable.Take(1);
Console.WriteLine(value);
}
这会产生
0
。
如果您想等待所有值,请尝试以下操作:
async Task Main()
{
IObservable<int> observable = Observable.Range(0, 10);
int[] values = await observable.ToArray();
Console.WriteLine(String.Join(", ", values));
}
这会产生
0, 1, 2, 3, 4, 5, 6, 7, 8, 9
。