RxJava上的运算符Observable / Flowable延迟发射n个项目

问题描述 投票:0回答:1

我想对Flowable进行转换,以使其延迟发射项目,直到收集到指定数量的项目为止,然后按照FIFO顺序将它们发射到下游,以保持恒定的延迟项目计数。上游发出onComplete信号后,应在发出onComplete之前将排队的项目刷新到下游:

(在此示例中,延迟的商品编号为3)

1 2 3 4 5 6 7 |
      1 2 3 4 5 6 7 |

我看不到任何现有的运算符可以执行此操作,也可以对其进行修改以实现该行为。 Observable.delay似乎仅支持基于时间的延迟,不支持基于计数的延迟。

实现自定义运算符应该很容易实现这一点,但是现有的运算符也许有更简单的方法?

java rx-java reactive-programming rx-java2
1个回答
0
投票
您可以发布序列,以N跳过自我压缩,然后在最后N后面附加:

Flowable.range(1, 7) .flatMap(v -> Flowable.timer(v * 200, TimeUnit.MILLISECONDS).map(w -> v)) .doOnNext(v -> System.out.println(v)) // ------------------------------------------------------------------- .publish(f -> f.skip(3).zipWith(f, (a, b) -> b).mergeWith(f.takeLast(3)) ) // ------------------------------------------------------------------- .blockingSubscribe(v -> System.out.println("<-- " + v));

© www.soinside.com 2019 - 2024. All rights reserved.