然后将流与RX合并

问题描述 投票:0回答:3

我正在寻找某种最佳实践或模式来处理以下情况:

我有两个要通过以下方式组合的流:

                               |-------------|
                               | Transform A |------------>|---------|
              |---------|----->|-------------|             |         |
Stream A----->| Split A |                                  | Combine |-------> ?
              |---------|----->|--------------------|      |   All   |
                               | Transform and      |      |         |
                               | Combine A (latest) |----->|---------|
                               | and B (latest)     |
Stream B---------------------->|--------------------|

流A和B正在异步生成事件。我想结合A和B的最新信息,并将结果与​​A之上的计算结果结合起来。

[接收到B中的事件时,整个管道将使用该事件中的值以及A中的最新值来运行。

是否有一种优雅的方法来确保在收到A中的事件时,Combine All会运行基于A中此事件的事件,并避免在[[Transform A和转换并合并A和B

concurrency rxjs rx-java rx-java2 system.reactive
3个回答
0
投票
使用Observable.Publish(source, x => ...)可确保您仅拥有source的一个订阅,但您可以将其用作变量x的任意次数。它确保您永远不会在source上获得任何比赛条件。

您可以这样使用它:

Func<A, C> transform_A = a => ...; Func<A, B, D> combine_A_and_B = (a, b) => ...; Func<C, D, E> combine_all = (c, d) => ...; IObservable<A> stream_a = ...; IObservable<B> stream_b = ...; IObservable<E> query = stream_a .Publish(stream_a_published => { var stream_c = stream_a_published.Select(transform_A); var stream_d = stream_a_published.CombineLatest(stream_b, combine_A_and_B); return stream_c.CombineLatest(stream_d, combine_all); });


0
投票
您可以通过将单个线程的调度程序与debounce运算符组合在一起来进行此操作:

class ManualExecutor : Executor { private val tasks = ArrayDeque<Runnable>() override fun execute(command: Runnable) = tasks.push(command) fun runAllTasks() { while (tasks.isNotEmpty()) { tasks.pop().run() } } } val a: Observable<A> val b: Observable<B> val scheduler = Schedulers.from(ManualExecutor()) val aTransformed = a.observeOn(scheduler).map { transformA(it) } val aCombinedWithB = combine(a, b).observeOn(scheduler) val final = combine(aTransformed, aCombinedWithB).debounce(0) // some time later.... emitA() // now all the updates are queued in our ManualExecutor scheduler.runAllTasks() // final will only emit once, not twice!

当然,这不是开箱即用的,您必须弄乱调度程序和测试以使其正确,但是也许这个想法会有所帮助。如果使用具有“零”超时的去抖动感觉有点太过分,您还可以使用其他签名,以可观察的方式完全控制去抖动周期。

但是,如果您的预期用途不是一般情况,并且仅针对上述情况,则可以通过类似以下方式简化问题:

|-----------------------| Stream A----->| map A to | | pair(A, transform(A)) | |-----------------------| | |----->|--------------------| | Combine A, t(A) | | and B (latest) |--> ? | into 3-tuple | Stream B---------------------->|--------------------|


-1
投票
  1. 确实没有拆分功能,但是您可以使用可观察的共享源,并可以使用Publish完成。所有订阅者将看到相同的可观察对象。
  2. base B上具有A的最新值的管道,您可以具有:

streamB.WithLatestFrom(streamA)
    为了确保A和B不参与竞争,请在同一事件循环上观察它们两个,以防止B发出直到完成A的工作。
  • var eventloop = new EventLoopScheduler(); var streamA = interval.ObserveOn(eventloop); var streamB = interval.ObserveOn(eventloop);
    全部放在一起:

    var interval = Observable.Interval(TimeSpan.FromSeconds(.1)).Skip(1); var eventloop = new EventLoopScheduler(); var streamA = interval.ObserveOn(eventloop).Publish(); var streamB = interval.ObserveOn(eventloop).Select(i => i.ToString("x")); var transformA = streamA.Select(i => i.ToString("000")); var transformAB = streamB.WithLatestFrom(streamA, (b, a) => (a, b)); var combineAll = transformAB.WithLatestFrom(transformA, (ab, a_) => (a_, ab.a, ab.b)); combineAll.Subscribe(v => Console.WriteLine($"A: {v.a}, A': {v.a_}, B: {v.b}")); streamA.Connect();

    输出:

    A: 1, A': 001, B: 1 A: 1, A': 001, B: 2 A: 2, A': 002, B: 3 A: 3, A': 003, B: 4 A: 4, A': 004, B: 5 A: 5, A': 005, B: 6 A: 6, A': 006, B: 7 A: 7, A': 007, B: 8 A: 8, A': 008, B: 9 A: 9, A': 009, B: a A: 10, A': 010, B: b A: 11, A': 011, B: c A: 12, A': 012, B: d A: 13, A': 013, B: e A: 14, A': 014, B: f A: 15, A': 015, B: 10

    谜语提示,而不是:

    var streamA = ObserveOn(eventloop).Publish()

    最好将整个管道写在闭包内部:

    interval.ObserveOn(eventloop).Publish(streamA => { ... });

  • © www.soinside.com 2019 - 2024. All rights reserved.