在F#中一次执行一个异步观察

问题描述 投票:0回答:1

我有一个需要观察的事物序列,一个C#任务。这些C#任务不应同时运行,而应在之后另一个。基本上,我需要实现与此C#等效的F#问题:

Enforcing one async observable at a time

天真地翻译此C#代码会得到如下内容:

let run (idx:int) (delay:int) =
    async {
        sprintf "start: %i (%i)" idx delay |> System.Diagnostics.Trace.WriteLine
        let! t = System.Threading.Tasks.Task.Delay(delay) |> Async.AwaitTask
        sprintf "finish: %i" idx  |> System.Diagnostics.Trace.WriteLine
        t
    }

    Observable.generate (new Random()) (fun _ -> true) id (fun s -> s.Next(250, 500))
    |> Observable.take 20
    |> Observable.mapi(fun idx delay -> idx, delay)
    |> Observable.bind(fun (idx, delay) -> Observable.ofAsync (run idx delay))
    |> Observable.subscribe ignore
    |> ignore

这无法按预期工作,因为我不在任何地方等待结果。甚至有一种在F#中执行此操作而不阻塞线程的方法,就像C#的等待会一样吗?

f# async-await system.reactive
1个回答
0
投票

F#中存在一个方便的库,称为AsyncSeq:https://www.nuget.org/packages/FSharp.Control.AsyncSeq/

与添加到C#8.0中的IAsyncEnumerable<T>非常相似,这为您提供了一个不错的解决方案。

let run (idx:int) (delay:int) =
    async {
        sprintf "start: %i (%i)" idx delay |> System.Diagnostics.Trace.WriteLine
        do! System.Threading.Tasks.Task.Delay(delay) |> Async.AwaitTask
        sprintf "finish: %i" idx  |> System.Diagnostics.Trace.WriteLine
    }

Observable.generate (new Random()) (fun _ -> true) id (fun s -> s.Next(250, 500))
|> Observable.take 20
|> Observable.mapi(fun idx delay -> idx, delay)
|> AsyncSeq.ofObservableBuffered
|> AsyncSeq.iterAsync (fun (idx,delay) -> run idx delay)
|> Async.RunSynchronously
© www.soinside.com 2019 - 2024. All rights reserved.