我有以下场景,有两个端点(我们称之为 A 和 B)返回一些值,我需要处理这些值并将其报告给第三个端点(我们称之为 C)。无法组合/处理的值我需要在最后向相同的第三端点 (C) 报告。
我通过从每个端点 A 和 B 创建一个 Flux 来做到这一点,然后当我处理组合值时,我将这些结果发布到端点 C。如果无法处理这些值,我会将它们存储在 Map 中。
经过一番思考,我决定从这个 Map 创建第三个 Flux,当两个端点停止返回值时我应该处理它。当它们停止时,我完成通量 A 和 B 中的每一个,这反过来应该完成combineLatest
。
flux1
确保当其中一个完成时我仍然会轮询 A 或 B。我有一个从上面的地图创建 Flux 的方法。我希望这个方法只有在第一个 Flux 完成时才会被调用。我尝试通过做combineLatest
来做到这一点。
下面是一些伪代码:switchIfEmpty
第一部分,flux1处理工作。但 Flux2 从未被处理。起初我认为这是因为所讨论的 Map 一开始就是空的,因为只有当 Flux1 完成处理所有值时它才会有东西在里面。但后来我发现创建
var flux1 = Flux.combineLatest(fluxA, fluxB, Tuple<A,B>)
.processRecords(Tuple<A,B>)
.prepareDataToBeSent();
var flux2 = createFluxFromValuesInMap();
flux1.switchIfEmpty(flux2)
.flatMap(sendResult)
.subscribe();
的方法甚至没有被调用。
那是因为flux2
实际上已经完成了吗?
我不知道如何使用反应堆做到这一点?更新:经过一些测试,我想如果
flux1
完成,
flux1
将不会开始发射。但是,如果其中一个通量完成,是否有办法链接通量?因此,仅当上游流从未发出值时才会触发switchIfEmpty。
您想要的是 flux1.concatWith(flux2),在
flux2
完成后开始 flux2
,无论有或没有发出元素。