将带有IO操作的无限循环转化为可观察的循环。

问题描述 投票:0回答:1

你好,我想把一个长期运行的任务(每次迭代时从socket中读取数据)转换为 "任务"。Observable.这项任务收到的是 CancellationToken.

一些模型

public class Model
{
   public string Data{get;set;}
}

长期任务

    CancellationTokenSource cts=new CancellationTokenSource();
    ClientWebSocket socket=await ConnectTheWebSocketAsync();
    Task t1=Task.Run(async()=>{
       while(true)
       {
          cts.Token.ThrowIfCancellationRequested();
          byte[] rawInput = ArrayPool<byte>.Shared.Rent(bufferSize);
          var rec = await socket.ReceiveAsync(rawInput, cts.Token);
          Model result = rawInput.AsMemory().Slice(0, rec.Count).Decode<Model>();  //some wrapper for serialization
          ArrayPool<byte>.Shared.Return(rawInput);
          //i want to push this result into an observable
          //Could i use an Observable.Empty and put data in it here ?
         // myObs.OnNext(result);
       }
    },cts.Token);

可观察到的企图

我一直在寻找方法来创建这个可观察到的,我有偶然发现 FromAsync:

 var obs = Observable.FromAsync((cts) => {
                byte[] rawInput = ArrayPool<byte>.Shared.Rent(bufferSize);
                var rec = await socket.ReceiveAsync(rawInput, cts.Token);
                Model result = rawInput.AsMemory().Slice(0, rec.Count).Decode<Model>();  //some wrapper for serialization
                ArrayPool<byte>.Shared.Return(rawInput);
                return result;

            }).Repeat();
  • 我不明白的是,我怎么把一个。CancellationToken 在...上 Repeat 如果我想让观察到的东西停止.
  • .我已经看到了 FromAsync 也有过载与 CancellationToken...我假设这个标记将被传递给每个元素,我会把它注入到套接字中的。ReceiveAsync 也。这将给我控制当前的 ReceiveAsync 或当前元素的操作,但它仍然不会停止。Repeat.

所以我的问题是。

Observable.FromAsync((token)=> [some io action]).Repeat();

鉴于上述模式: - 我如何停止一个单一的迭代(从套接字读取) - 我如何停止整个循环?Repeat)--在哪里会有 FromAsync 来自

io observable system.reactive
1个回答
© www.soinside.com 2019 - 2024. All rights reserved.