Spring Webflux 在返回响应之前等待所有 Flux.fromIterable

问题描述 投票:0回答:1

我有一个反应式服务并且运行良好

public Mono<GetCasesListResponse> getCasesList(@RequestBody GetCasesListRequest request) {
        return casesService.getCasesList(mapToGetCasesListServiceRequest(request))
                .map(GetCasesListMapper::mapGetCasesListResponse)
                .onErrorResume(exceptionHandler::fallbackMethod);
}

案例服务

public Mono<GetCasesListServiceResponse> getCasesList(GetCasesListServiceRequest request) {
        return webClient.post()
                .uri("/getCasesList")
                .bodyValue(request)
                .retrieve()
                .bodyToMono(GetCasesListResponse.class);
}

但现在我有下一个任务。对于地图中的每个元素,并行从另一个服务以及列表中获取数据。并将此数据添加到 GetCasesListResponse 中数组的每个元素。 我决定使用 Flux.fromIterable

首先添加flatMap

public Mono<GetCasesListResponse> getCasesList(@RequestBody GetCasesListRequest request) {
        return casesService.getCasesList(mapToGetCasesListServiceRequest(request))
                .map(GetCasesListMapper::mapGetCasesListResponse)
                .flatMap(this::getMoreComments)
                .onErrorResume(exceptionHandler::fallbackMethod);
}
private Mono<GetCasesListResponse> getMoreComments(GetCasesListResponse casesListResponse) {
        collectExtIds(casesListResponse).forEach((id, extIdList) -> fromIterable(extIdList).parallel()
                .flatMap(extId -> stpService.getMoreComments(mapToGetMoreCommentsRequest(extId)))
                .subscribe(moreCommentsResponse -> addMoreCommentsToGetCasesListResponse(casesListResponse, id, moreCommentsResponse)));

        return Mono.just(casesListResponse);
}

获取更多评论

public Mono<GetCommentResponse> getMoreComments(GetCommentRequest request) {
        return webClient.post()
                .uri("/getCaseComments")
                .bodyValue(request)
                .retrieve()
                .bodyToMono(GetCommentResponse.class)
}

但是我对 getMoreComments 有问题,因为此方法在 fromIterable 完成之前返回 Mono.just(casesListResponse) 。我得到了没有任何附加数据的caseListResponse。我如何等待从 fromIterable 接收每个元素的数据?

spring-boot spring-webflux project-reactor
1个回答
0
投票

有几个问题:

  • 你不应该使用
    subscribe
    ,这是你应该避免使用的重击规则,除非你知道为什么你到底需要它并且不能依赖其他方式
  • 通常,当您想要在其他反应流之后返回一个值时,您可以使用 then(result)
    
    
  • 但是在这种情况下,您实际上返回相同的对象,您只是修改它。一般来说,这是不好的。
  • 好的代码会将所有评论累积到一个列表中(使用
  • reduce
    ),并仅在最后一步将其设置为结果。但是,如果您确定您的对象是线程安全的,您可以在代码中执行类似的操作(正如我在示例中所示的那样)
  • 顺便说一句,使用
  • parallel
     在你的情况下没有任何作用。它仅对高 CPU 
    map
     有用,并且需要 
    runOn
     配置。标准 
    flatMap
     已经平行。
应该是这样的:

Mono<GetCasesListResponse> getMoreComments( GetCasesListResponse casesListResponse ) { Flux.fromIterable(collectExtIds(casesListResponse)) .flatMap( (id, extIdList) -> Flux.fromIterable(extIdList) .flatMap(extId -> stpService.getMoreComments( mapToGetMoreCommentsRequest(extId) ) ) .reduce(casesListResponse, (accum, moreComments) -> addMoreCommentsToGetCasesListResponse( accum, id, moreComments) ) ) ) .last() }
我在这里使用 

last

 只是因为所有 
flatMap
 返回 
casesListResponse
 对象的相同实例。但如果你可以做一个
then(casesListResponse)
来代替。另外我建议以一种可以接受所有评论列表的方式重写
GetCasesListResponse
,这样reduce就不会操作共享实例

© www.soinside.com 2019 - 2024. All rights reserved.