我开始使用Project reactor和一个我很难挣扎的地方,我如何将来自Mono和Flux的东西结合起来。这是我的用例:
public interface GroupRepository {
Mono<GroupModel> getGroup(Long groupId);
}
public interface UserRepository {
Flux<User> getUsers(Set<Long> userIds);
}
Mono<GroupModel> groupMono = getGroup(groupId);
Flux<User> userFlux = getUsers(Set<Long> users);
//run above instrtuction in parallel and associate user to group.
现在我想要实现的是:
如何组合UserFlux的响应并将这些用户与该组相关联,例如group.addUsers(userfromFlux)。
有人可以帮助我们如何组合来自userFlux和groupMono的结果。我想我使用像Zip这样的东西,然后它从源头做一对一的映射。就我而言,我需要做1到N的映射。在这里,我有一个组,但我需要添加到该组的多个用户。返回Mono<List<Users>
然后使用单声道的zip操作符并提供这里提到的组合器是一个好主意
public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
final BiFunction<? super T1, ? super T2, ? extends O> combinator)
?
我认为Flux.combineLatest
静态方法可以帮助你:因为你的Mono
只发射1个元素,那个元素将永远是与Flux
的每个输入值组合的元素。
Flux.combineLatest(arr -> new Combination((GroupModel) arr[0], (User) arr[1]),
groupMono, userFlux);
我使用了Flux.zip(groupMono, userMono(holding list of users), this::biFunctionToPopulateGroupWithUsers)
为其他人添加答案。我使用这种方法而不是@Simon的建议,因为持有用户的基础组是HashSet
,并且以被动方式添加用户将不是线程安全的。但是如果你有一个线程安全的数据结构,我会使用@Simon的建议。