Reactive Extensions Observerable.FromAsync: 如何等到异步操作完成后再进行操作?

问题描述 投票:1回答:1

在我的应用程序中,我从一个消息总线(rabbit mq,但实际上这并不重要)获取事件。这些事件的处理时间很长,而且事件是以突发的方式产生的。一次只能处理一个事件。为了实现这一点,我使用Rx来对事件进行序列化,并异步执行处理,以避免阻塞生产者。

下面是(简化的)示例代码。

static void Main(string[] args)
{
   var input = Observable.Interval(TimeSpan.FromMilliseconds(500));
   var subscription = input.Select(i => Observable.FromAsync(ct => Process(ct, i)))
                           .Concat()
                           .Subscribe();

   Console.ReadLine();
   subscription.Dispose();
   // Wait until finished?
   Console.WriteLine("Completed");
}

private static async Task Process(CancellationToken cts, long i)
{
   Console.WriteLine($"Processing {i} ...");
   await Task.Delay(1000).ConfigureAwait(false);
   Console.WriteLine($"Finished processing {i}");
}

样本应用程序是在处理订阅,然后应用程序被终止。但在这个示例中,应用程序在终止时,订阅前收到的最后一个事件仍在处理中。

问题是 最好的方法是在处理完订阅后,等待最后一个事件的处理?我还在试图理解异步Rx的含义。我想有一个很简单的方法可以做到这一点,只是我现在还没有看到。

c# system.reactive
1个回答
2
投票

如果你能找到一个快速和简单的解决方案,适用于像这样的简单情况,那么你可以只用 WaitIObservable 而不是订阅它。如果你想收到类似订阅的通知,请使用 Do 操作员在最后的 Wait:

static void Main(string[] args)
{
    var input = Observable.Interval(TimeSpan.FromMilliseconds(500));
    input.Select(i => Observable.FromAsync(ct => Process(ct, i)))
        .Concat()
        .Do(onNext: x => { }, onError: ex => { }, onCompleted: () => { })
        .Wait();
    Console.WriteLine("Completed");
}

1
投票

感谢Theodor的建议,我现在找到了一个适合我的解决方案。

static async Task Main(string[] args)
{
    var inputSequence = Observable.Interval(TimeSpan.FromMilliseconds(500));
    var terminate = new Subject<Unit>();
    var task = Execute(inputSequence, terminate);

    Console.ReadLine();
    terminate.OnNext(Unit.Default);
    await task.ConfigureAwait(false);
    Console.WriteLine($"Completed");
    Console.ReadLine();
}

private static async Task Process(CancellationToken cts, long i)
{
    Console.WriteLine($"Processing {i} ...");
    await Task.Delay(1000).ConfigureAwait(false);
    Console.WriteLine($"Finished processing {i}");
}

private static async Task Execute(IObservable<long> input, IObservable<Unit> terminate)
{
    await input
          .TakeUntil(terminate)
          .Select(i => Observable.FromAsync(ct => Process(ct, i)))
          .Concat();
}
© www.soinside.com 2019 - 2024. All rights reserved.