我正在研究Http SSE流,我必须保持http连接的活力。为此,我每15秒发送一个空消息(命名为心跳,其他都是真实事件)。
source
.map(EventSource.Event::event)
.mergeWith(interval(0, 15, TimeUnit.SECONDS).map(t -> event("").withName("heartbeat")))
这个解决方案的缺点是,如果最后一个真实事件是在< 15 SECONDS内发送的,我就会发送心跳事件。
我想要的是在最后一个接收到的消息是15 SECONDS后开始发送心跳事件,当得到真实事件时停止发送该事件。
类似于下面的内容。
---x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x-x
感谢任何帮助:)
我想 switchMap
操作员是你要找的。
source
.map(EventSource.Event::event)
.startWith(event("").withName("heartbeat"))
.switchMap (event -> interval(15, TimeUnit.SECONDS)
.map(t -> event("").withName("heartbeat"))
.startWith(event)
)
EDIT
正如你在评论中提到的,即使没有任何事件,你也需要启动 heardBeat。为此你需要添加一些 Init
融入你的 EventSource.Event
并使用 filter()
操作符,以便以后忽略它。见编辑的代码。
编辑2
去掉初始延迟(initialDelay),让它从HeartBeat事件开始。