如何等待一个observable完成?

问题描述 投票:0回答:1

在Node.js中,你可以设置一个服务器,只要服务器还活着,并且在事件循环中处于活跃状态,这个进程就不会终止。我想知道是否可以使用反应式扩展来做这样的事情?我知道如何设置一个命名管道,并利用它与另一个进程中的Node服务器进行通信,但我不知道如何通过等待按键以外的其他方式来防止程序终止。我想把该服务器作为反应式管道的一部分来实现。能否在流水线完成之前阻塞主线程?

.net f# reactive-programming system.reactive
1个回答
0
投票
open System
open System.Threading
open FSharp.Control.Reactive

module Observable = 
    /// Blocks the thread until the observable completes. 
    let waitUnit on_next (o : IObservable<_>) =
        use t = new ManualResetEventSlim()
        use __ = Observable.subscribeWithCompletion on_next (fun _ -> t.Set()) o
        t.Wait()

Observable.interval (TimeSpan.FromSeconds(1.0))
|> Observable.take 3
|> Observable.waitUnit (printfn "%i...")

printfn "Done."

以上是使用 F# Rx绑定但C#版本也会类似。该库本身有 Observable.wait 属于 IObservable<'a> -> 'a但这个方法的缺点是会在空序列上抛出一个异常,并且会在内存中保留最新的值,直到观测值被处理掉。如果最终值不重要的话,这个有正确的语义。

我看了一下引擎盖下的内容,发现 wait 用途 ManualResetEventSlim 来阻止它所在的线程,所以这应该没问题。


0
投票

很抱歉,我不能在F#中给出答案,但在C#中这很简单。

如果我有这个例子:

Observable
    .Range(0, 10)
    .Subscribe(x => Console.WriteLine(x));

...而我想等到它完成,我可以这样改写:

    Observable
        .Range(0, 10)
        .Do(x => Console.WriteLine(x))
        .ToArray()
        .Wait();

...或者

    await Observable
        .Range(0, 10)
        .Do(x => Console.WriteLine(x))
        .ToArray();

正如Marko在评论中指出的那样,这种方法会创建一个可能相当大的数组。为了解决这个问题,你可以将 .ToArray().LastAsync() (该名称是在普遍使用 Async 以示 Task - 在这种情况下,它只是将观测到的最后一个元素作为一个 IObservable<T>).

.Wait() 确实抛出了一个空的观测值,但是 await 版本没有。

© www.soinside.com 2019 - 2024. All rights reserved.