RxJava 。在最后一次发射后开始心跳,直到下一次发射。

问题描述 投票:0回答:1

我正在研究Http SSE流,我必须保持http连接的活力。为此,我每15秒发送一个空消息(命名为心跳,其他都是真实事件)。

source
.map(EventSource.Event::event)
.mergeWith(interval(0, 15, TimeUnit.SECONDS).map(t -> event("").withName("heartbeat"))) 

这个解决方案的缺点是,如果最后一个真实事件是在< 15 SECONDS内发送的,我就会发送心跳事件。

我想要的是在最后一个接收到的消息是15 SECONDS后开始发送心跳事件,当得到真实事件时停止发送该事件。

类似于下面的内容。

---x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x

  • x是真实事件
  • h是心跳事件
  • - 等于1秒

感谢任何帮助:)

rx-java rx-java2
1个回答
0
投票

我想 switchMap 操作员是你要找的。

source
    .map(EventSource.Event::event)
    .startWith(event("").withName("heartbeat"))
    .switchMap (event -> interval(15, TimeUnit.SECONDS)
        .map(t -> event("").withName("heartbeat"))
        .startWith(event)
    )

EDIT

正如你在评论中提到的,即使没有任何事件,你也需要启动 heardBeat。为此你需要添加一些 Init 融入你的 EventSource.Event 并使用 filter() 操作符,以便以后忽略它。见编辑的代码。

编辑2

去掉初始延迟(initialDelay),让它从HeartBeat事件开始。

© www.soinside.com 2019 - 2024. All rights reserved.