如何使异步等待响应式扩展 onNext、onError、onCompleted 委托

问题描述 投票:0回答:1

我创建了以下扩展,我可以在其中使

IObservable.Subscribe
可等待,但包装整个订阅。

当前 C# 异步编程和下面的代码中的一个大问题是不可等待的

onNext
onError
onCompleted

有没有办法解决并拥有异步/等待代码?我错过的任何扩展方法;

public static Task Observe<T>(this IObservable<T> observable,
        Action<T> onNext,
        Func<Exception, EventWaitHandle, ExceptionDispatchInfo> onError,
        Action<EventWaitHandle, CancellationToken> onCompleted,
        CancellationToken cancellationToken
    ) where T : class
    {
        var compositeDisposables = new CompositeDisposable();
        var waitHandle = new ManualResetEvent(false);
        compositeDisposables.Add(waitHandle);

        var disposable = observable.Subscribe(
            // This should be await onNext
            onNext, 
            // This should be await onError
            e => onError(e, waitHandle),
            // This should be await onCompleted
            () => onCompleted(waitHandle, cancellationToken));
        
        compositeDisposables.Add(disposable);

        waitHandle.WaitOne();
        compositeDisposables.Dispose();

        return Task.CompletedTask;
    }

我知道有关于 onNext 异步的解决方案,但不涵盖 onError 和 onCompleted。

c# async-await system.reactive
1个回答
0
投票

您已经可以等待可观察的了。考虑这段代码:

async Task Main()
{
    IObservable<int> observable = Observable.Range(0, 10);
    
    int value = await observable;
    
    Console.WriteLine(value);
}

这会在控制台上产生

9
值。可观察到的最后一个值。

如果你想要第一个,只需这样做:

async Task Main()
{
    IObservable<int> observable = Observable.Range(0, 10);
    
    int value = await observable.Take(1);
    
    Console.WriteLine(value);
}

这会产生

0

如果您想等待所有值,请尝试以下操作:

async Task Main()
{
    IObservable<int> observable = Observable.Range(0, 10);
    
    int[] values = await observable.ToArray();
    
    Console.WriteLine(String.Join(", ", values));
}

这会产生

0, 1, 2, 3, 4, 5, 6, 7, 8, 9

© www.soinside.com 2019 - 2024. All rights reserved.