如何从IObservable中拉取

问题描述 投票:0回答:2

假设端点

A
处有一个 Subject<T>,端点
B
处有 IObservable<T>

Endpoint A 使用 OnNext() 发送 T 的一个对象,并且从不调用 OnComplete()。我无法控制它,它在外部组件中。当我在我的端点 B 上订阅时,我的 lambda 没有被调用:

endpoint.Subscribe(t => { doSomething(); });

但是,当我这样订阅时:

using var cts = new CancellationTokenSource();
await endpoint.ForEachAsync(t => { doSomething(); /* cts.Cancel(); */ }, cts.Token);

确实让我的 lambda 被调用。但有一个缺点。由于发布者从不调用 OnCompleted(),因此除非我在 doSomething() 方法之后调用

cts.Cancel()
,否则我无法转义该异步任务。它有点有效,但我不喜欢将异常作为流程的一部分抛出的想法。有更好的办法吗?

c# observable task-parallel-library system.reactive reactive
2个回答
0
投票

经过多次实验我终于找到了解决办法:

using var cts = new CancellationTokenSource();
cts.CancelAfter(timeout);  // Some timeout
var timeoutTask = Task.Delay(timeout, cts.Token);
var stopperTask = new TaskCompletionSource();
var subscription = endpoint
                   .ObserveOn(CurrentThreadScheduler.Instance)
                   .Subscribe(t => {
                      // doSomething();
                      stopperTask.SetResult();
                   });
await Task.WhenAny(stopperTask.Task, timeoutTask);
subscription.Dispose();
if (!stopperTask.Task.IsCompleted) stopperTask.SetResult();
if (!timeoutTask.IsCompleted) cts.Cancel();

突然订阅开始接收数据,但实际上我订阅后需要一点时间才能获取数据,它带有延迟。这就是为什么我想出了这个不是最优雅的解决方案。至少它不会抛出 TaskCancelledException。当 stopperTask 或 timeoutTask 完成时,我会优雅地处理所有内容。

对于每个此类调用,Visual Studio 都会显示一个死锁任务。我尝试了几乎所有可用的调度程序,

CurrentThreadScheduler
工作得很好。


0
投票

首先,你的描述有点令人困惑。如果您的

endpoint
发送了一个值而没有完成,那么您的订阅应该仍然可以正常执行。如果不完成或处理订阅,它就不会结束。

这很容易解决。只需添加一个

.Take(1)

endpoint.Take(1).Subscribe(t => { doSomething(); });

第一个值后会自动完成。

如果值到达有延迟并且您想设置超时,那么您有两个选择。

(1) 您可以添加

.Timeout(timeout)
运算符(如果
timeout
TimeSpan
)。

endpoint
    .Timeout(timeout)
    .Subscribe(
        t => { doSomething(); },
        ex => { /* timeout here */ });

(2) 您可以使用

.Amb
运算符 - 它根据两个输入可观察量中的哪一个首先触发来生成一个值 - 如果未出现实际值,则在一段时间后生成默认值。

endpoint
    .Take(1)
    .Amb(Observable.Timer(timeout).Select(_ => -1))
    .Subscribe(t => { doSomething(); });

如果超时,您会得到

t == -1
- 假设端点正在生成一个整数。

当订阅被处置时,所有这些选项都会取消 - 因此无需浪费取消令牌。

© www.soinside.com 2019 - 2024. All rights reserved.