我有一个天气消息流,每条消息都包含一个站点 ID 和一堆与天气相关的参数。每 5 分钟就会有来自三到四个不同站点的新消息。
我想构建一个 Flux,根据站点 ID 将消息拆分为单独的 Flux,缓存或重播每个站点的最后 12 条消息,然后将它们合并回订阅者的单个 Flux。
我尝试了以下方法,在weatherMessagesSink上发出30条消息,然后订阅,所有消息都由第一个订阅接收。然而,第二个 subscribe() 不会接收任何缓存的消息,但会接收随后在接收器上发出的新消息。
weatherMessagesSink
.asFlux()
.groupBy(weatherMessage -> weatherMessage.getSiteID())
.flatMap(groupedFlux -> groupedFlux.cache(12)
更简单的方法可能是仅重播最后 4 * 12 条消息,但如果添加或删除站点,这会有点脆弱。此外,如果每个站点的消息进入速率不同,那么也意味着每个站点缓存的消息数量不同。
经过大量搜索和一些试验和错误,我发现以下内容可以按要求工作。
weatherMessagesSink
.asFlux()
.groupBy(weatherMessage -> weatherMessage.getSite())
.map(groupedFlux -> groupedFlux.cache(12))
.cache()
.flatMap(groupedFlux -> groupedFlux);
作为 Project Reactor 的新手,我并没有发现这一点特别明显,但我想我明白它现在在做什么。希望这对其他想要解决类似问题的人有用。