假设端点
A处有一个
Subject<T>
,端点 B处有
IObservable<T>
。
Endpoint A 使用 OnNext() 发送 T 的一个对象,并且从不调用 OnComplete()。我无法控制它,它在外部组件中。当我在我的端点 B 上订阅时,我的 lambda 没有被调用:
endpoint.Subscribe(t => { doSomething(); });
但是,当我这样订阅时:
using var cts = new CancellationTokenSource();
await endpoint.ForEachAsync(t => { doSomething(); /* cts.Cancel(); */ }, cts.Token);
我确实让我的 lambda 被调用。但有一个缺点。由于发布者从不调用 OnCompleted(),因此除非我在 doSomething() 方法之后调用
cts.Cancel()
,否则我无法转义该异步任务。它有点有效,但我不喜欢将异常作为流程的一部分抛出的想法。有更好的办法吗?
经过多次实验我终于找到了解决办法:
using var cts = new CancellationTokenSource();
cts.CancelAfter(timeout); // Some timeout
var timeoutTask = Task.Delay(timeout, cts.Token);
var stopperTask = new TaskCompletionSource();
var subscription = endpoint
.ObserveOn(CurrentThreadScheduler.Instance)
.Subscribe(t => {
// doSomething();
stopperTask.SetResult();
});
await Task.WhenAny(stopperTask.Task, timeoutTask);
subscription.Dispose();
if (!stopperTask.Task.IsCompleted) stopperTask.SetResult();
if (!timeoutTask.IsCompleted) cts.Cancel();
突然订阅开始接收数据,但实际上我订阅后需要一点时间才能获取数据,它带有延迟。这就是为什么我想出了这个不是最优雅的解决方案。至少它不会抛出 TaskCancelledException。当 stopperTask 或 timeoutTask 完成时,我会优雅地处理所有内容。
对于每个此类调用,Visual Studio 都会显示一个死锁任务。我尝试了几乎所有可用的调度程序,
CurrentThreadScheduler
工作得很好。
首先,你的描述有点令人困惑。如果您的
endpoint
发送了一个值而没有完成,那么您的订阅应该仍然可以正常执行。如果不完成或处理订阅,它就不会结束。
这很容易解决。只需添加一个
.Take(1)
。
endpoint.Take(1).Subscribe(t => { doSomething(); });
第一个值后会自动完成。
如果值到达有延迟并且您想设置超时,那么您有两个选择。
(1) 您可以添加
.Timeout(timeout)
运算符(如果 timeout
是 TimeSpan
)。
endpoint
.Timeout(timeout)
.Subscribe(
t => { doSomething(); },
ex => { /* timeout here */ });
(2) 您可以使用
.Amb
运算符 - 它根据两个输入可观察量中的哪一个首先触发来生成一个值 - 如果未出现实际值,则在一段时间后生成默认值。
endpoint
.Take(1)
.Amb(Observable.Timer(timeout).Select(_ => -1))
.Subscribe(t => { doSomething(); });
如果超时,您会得到
t == -1
- 假设端点正在生成一个整数。
当订阅被处置时,所有这些选项都会取消 - 因此无需浪费取消令牌。