我正在寻找某种最佳实践或模式来处理以下情况:
我有两个要通过以下方式组合的流:
|-------------|
| Transform A |------------>|---------|
|---------|----->|-------------| | |
Stream A----->| Split A | | Combine |-------> ?
|---------|----->|--------------------| | All |
| Transform and | | |
| Combine A (latest) |----->|---------|
| and B (latest) |
Stream B---------------------->|--------------------|
流A和B正在异步生成事件。我想结合A和B的最新信息,并将结果与A之上的计算结果结合起来。
[接收到B中的事件时,整个管道将使用该事件中的值以及A中的最新值来运行。
是否有一种优雅的方法来确保在收到A中的事件时,Combine All会运行基于A中此事件的事件,并避免在[[Transform A和转换并合并A和B
Observable.Publish(source, x => ...)
可确保您仅拥有source
的一个订阅,但您可以将其用作变量x
的任意次数。它确保您永远不会在source
上获得任何比赛条件。您可以这样使用它:
Func<A, C> transform_A = a => ...;
Func<A, B, D> combine_A_and_B = (a, b) => ...;
Func<C, D, E> combine_all = (c, d) => ...;
IObservable<A> stream_a = ...;
IObservable<B> stream_b = ...;
IObservable<E> query =
stream_a
.Publish(stream_a_published =>
{
var stream_c = stream_a_published.Select(transform_A);
var stream_d = stream_a_published.CombineLatest(stream_b, combine_A_and_B);
return stream_c.CombineLatest(stream_d, combine_all);
});
class ManualExecutor : Executor {
private val tasks = ArrayDeque<Runnable>()
override fun execute(command: Runnable) = tasks.push(command)
fun runAllTasks() {
while (tasks.isNotEmpty()) {
tasks.pop().run()
}
}
}
val a: Observable<A>
val b: Observable<B>
val scheduler = Schedulers.from(ManualExecutor())
val aTransformed = a.observeOn(scheduler).map { transformA(it) }
val aCombinedWithB = combine(a, b).observeOn(scheduler)
val final = combine(aTransformed, aCombinedWithB).debounce(0)
// some time later....
emitA() // now all the updates are queued in our ManualExecutor
scheduler.runAllTasks() // final will only emit once, not twice!
当然,这不是开箱即用的,您必须弄乱调度程序和测试以使其正确,但是也许这个想法会有所帮助。如果使用具有“零”超时的去抖动感觉有点太过分,您还可以使用其他签名,以可观察的方式完全控制去抖动周期。但是,如果您的预期用途不是一般情况,并且仅针对上述情况,则可以通过类似以下方式简化问题:
|-----------------------| Stream A----->| map A to | | pair(A, transform(A)) | |-----------------------| | |----->|--------------------| | Combine A, t(A) | | and B (latest) |--> ? | into 3-tuple | Stream B---------------------->|--------------------|
Publish
完成。所有订阅者将看到相同的可观察对象。base B上具有A的最新值的管道,您可以具有:
streamB.WithLatestFrom(streamA)
为了确保A和B不参与竞争,请在同一事件循环上观察它们两个,以防止B发出直到完成A的工作。
var eventloop = new EventLoopScheduler();
var streamA = interval.ObserveOn(eventloop);
var streamB = interval.ObserveOn(eventloop);
全部放在一起:
var interval = Observable.Interval(TimeSpan.FromSeconds(.1)).Skip(1); var eventloop = new EventLoopScheduler(); var streamA = interval.ObserveOn(eventloop).Publish(); var streamB = interval.ObserveOn(eventloop).Select(i => i.ToString("x")); var transformA = streamA.Select(i => i.ToString("000")); var transformAB = streamB.WithLatestFrom(streamA, (b, a) => (a, b)); var combineAll = transformAB.WithLatestFrom(transformA, (ab, a_) => (a_, ab.a, ab.b)); combineAll.Subscribe(v => Console.WriteLine($"A: {v.a}, A': {v.a_}, B: {v.b}")); streamA.Connect();
输出:
A: 1, A': 001, B: 1 A: 1, A': 001, B: 2 A: 2, A': 002, B: 3 A: 3, A': 003, B: 4 A: 4, A': 004, B: 5 A: 5, A': 005, B: 6 A: 6, A': 006, B: 7 A: 7, A': 007, B: 8 A: 8, A': 008, B: 9 A: 9, A': 009, B: a A: 10, A': 010, B: b A: 11, A': 011, B: c A: 12, A': 012, B: d A: 13, A': 013, B: e A: 14, A': 014, B: f A: 15, A': 015, B: 10
谜语提示,而不是:
var streamA = ObserveOn(eventloop).Publish()
最好将整个管道写在闭包内部:
interval.ObserveOn(eventloop).Publish(streamA => { ... });