将Flux的结果与Mono的结果相结合

问题描述 投票:2回答:2

我开始使用Project reactor和一个我很难挣扎的地方,我如何将来自Mono和Flux的东西结合起来。这是我的用例:

public interface GroupRepository {
       Mono<GroupModel> getGroup(Long groupId);
}

public interface UserRepository {
       Flux<User> getUsers(Set<Long> userIds);   
}

Mono<GroupModel> groupMono = getGroup(groupId);
Flux<User> userFlux = getUsers(Set<Long> users);
//run above instrtuction in parallel and associate user to group.

现在我想要实现的是:

如何组合UserFlux的响应并将这些用户与该组相关联,例如group.addUsers(userfromFlux)。

有人可以帮助我们如何组合来自userFlux和groupMono的结果。我想我使用像Zip这样的东西,然后它从源头做一对一的映射。就我而言,我需要做1到N的映射。在这里,我有一个组,但我需要添加到该组的多个用户。返回Mono<List<Users>然后使用单声道的zip操作符并提供这里提到的组合器是一个好主意 public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, final BiFunction<? super T1, ? super T2, ? extends O> combinator)

spring-webflux project-reactor reactive-streams
2个回答
3
投票

我认为Flux.combineLatest静态方法可以帮助你:因为你的Mono只发射1个元素,那个元素将永远是与Flux的每个输入值组合的元素。

Flux.combineLatest(arr -> new Combination((GroupModel) arr[0], (User) arr[1]),
                   groupMono, userFlux);

0
投票

我使用了Flux.zip(groupMono, userMono(holding list of users), this::biFunctionToPopulateGroupWithUsers)为其他人添加答案。我使用这种方法而不是@Simon的建议,因为持有用户的基础组是HashSet,并且以被动方式添加用户将不是线程安全的。但是如果你有一个线程安全的数据结构,我会使用@Simon的建议。

© www.soinside.com 2019 - 2024. All rights reserved.