你好,我想把一个长期运行的任务(每次迭代时从socket中读取数据)转换为 "任务"。Observable
.这项任务收到的是 CancellationToken
.
一些模型
public class Model
{
public string Data{get;set;}
}
长期任务
CancellationTokenSource cts=new CancellationTokenSource();
ClientWebSocket socket=await ConnectTheWebSocketAsync();
Task t1=Task.Run(async()=>{
while(true)
{
cts.Token.ThrowIfCancellationRequested();
byte[] rawInput = ArrayPool<byte>.Shared.Rent(bufferSize);
var rec = await socket.ReceiveAsync(rawInput, cts.Token);
Model result = rawInput.AsMemory().Slice(0, rec.Count).Decode<Model>(); //some wrapper for serialization
ArrayPool<byte>.Shared.Return(rawInput);
//i want to push this result into an observable
//Could i use an Observable.Empty and put data in it here ?
// myObs.OnNext(result);
}
},cts.Token);
可观察到的企图
我一直在寻找方法来创建这个可观察到的,我有偶然发现 FromAsync
:
var obs = Observable.FromAsync((cts) => {
byte[] rawInput = ArrayPool<byte>.Shared.Rent(bufferSize);
var rec = await socket.ReceiveAsync(rawInput, cts.Token);
Model result = rawInput.AsMemory().Slice(0, rec.Count).Decode<Model>(); //some wrapper for serialization
ArrayPool<byte>.Shared.Return(rawInput);
return result;
}).Repeat();
CancellationToken
在...上 Repeat
如果我想让观察到的东西停止.FromAsync
也有过载与 CancellationToken
...我假设这个标记将被传递给每个元素,我会把它注入到套接字中的。ReceiveAsync
也。这将给我控制当前的 ReceiveAsync
或当前元素的操作,但它仍然不会停止。Repeat
.所以我的问题是。
Observable.FromAsync((token)=> [some io action]).Repeat();
鉴于上述模式: - 我如何停止一个单一的迭代(从套接字读取) - 我如何停止整个循环?Repeat
)--在哪里会有 FromAsync
来自