将 Flux 拆分为分组 Flux,重播或缓存每个分组 Flux,然后合并

问题描述 投票:0回答:1

我有一个天气消息流,每条消息都包含一个站点 ID 和一堆与天气相关的参数。每 5 分钟就会有来自三到四个不同站点的新消息。

我想构建一个 Flux,根据站点 ID 将消息拆分为单独的 Flux,缓存或重播每个站点的最后 12 条消息,然后将它们合并回订阅者的单个 Flux。

我尝试了以下方法,在weatherMessagesSink上发出30条消息,然后订阅,所有消息都由第一个订阅接收。然而,第二个 subscribe() 不会接收任何缓存的消息,但会接收随后在接收器上发出的新消息。

weatherMessagesSink
  .asFlux()
  .groupBy(weatherMessage -> weatherMessage.getSiteID())
  .flatMap(groupedFlux -> groupedFlux.cache(12)

更简单的方法可能是仅重播最后 4 * 12 条消息,但如果添加或删除站点,这会有点脆弱。此外,如果每个站点的消息进入速率不同,那么也意味着每个站点缓存的消息数量不同。

reactive-programming project-reactor
1个回答
0
投票

经过大量搜索和一些试验和错误,我发现以下内容可以按要求工作。

weatherMessagesSink
        .asFlux()
        .groupBy(weatherMessage -> weatherMessage.getSite())
        .map(groupedFlux -> groupedFlux.cache(12))
        .cache()
        .flatMap(groupedFlux -> groupedFlux);

作为 Project Reactor 的新手,我并没有发现这一点特别明显,但我想我明白它现在在做什么。希望这对其他想要解决类似问题的人有用。

© www.soinside.com 2019 - 2024. All rights reserved.