同步弹簧webflux调用保持操作顺序

问题描述 投票:0回答:3

我有一个简单的用例,不知道该怎么做,因为我是新的春天webflux。

我正在使用spring boot webflux starters。我需要调用2个端点。让我们说Endpoint1Endpoint2

每当Endpoint1被请求命中时,我应首先使用相同的请求命中Endpoint2并使用Endpoint2的响应来丰富原始请求,然后再做一些事情。在做任何事情之前,需要使用Endpoint1的响应来丰富Endpoint2的请求对象。如何使用Spring webflux强制执行此订单?就我而言,原始请求对象在进一步使用之前不会被丰富。任何帮助都非常感谢!

仅供参考 - 使用Endpoint2致电webclient

Just a pseudo code:

public Mono<Response1> endpoint1(Request1 request1){

  Flux<Response2> reponse2 = webclient.getEndpoint2(request1); // Returns a Flux

  //use the above reponse2 to enrich the request1

  return webclient.getSomething(request1); //Returns Mono<Response1>

}

Actual code:


 public Mono<ApplicationResponse> save(ApplicationRequest request) {

        return Mono.subscriberContext().flatMap(ctx -> {

            Mono blockingWrapper =  Mono.fromCallable(() ->
                    service.getId(request)
                            .subscriberContext(ctx)
                            .subscribe(id -> request.setId(id))
            ).subscribeOn(Schedulers.elastic());

            return blockingWrapper.flatMap(o -> authService.getAccessToken()
                    .flatMap(token -> post("/save", request,
                            token.getAccessToken(),
                            ctx)
                            .bodyToMono(ApplicationResponse.class))
                    .log());
        });
    }
spring-boot spring-webflux project-reactor reactor
3个回答
2
投票

如果你确定你有一个带有getEndpoint2(request1)的Flux,在这种情况下,你可以使用collectList():

return webclient.getEndpoint2(request1) // Flux<Response2>
         .collectList() // Mono<List<Response2>>
         .flatMap(list -> {
            // ... should handle empty list if needed
            finalRequest = createRequest(request1, list);
            return webclient.getSomething(finalRequest); // Mono<Response1>
         });

0
投票

我看到一些有趣的事情发生了。如果我从Controller类中编排它,它会按预期工作,而如果我从我的Controller类调用一个服务来编排这个流程,它似乎没有按预期工作。只是想知道我错过了什么?或者这是它的工作方式?

这是工作代码:

@RestController
@RequestMapping("/applications")
@Slf4j
@RequiredArgsConstructor
public class ApplicationController {

    private final ApplicationService applicationService;
    private final ApplicationRequestMapper requestMapper;
    private final FeesService feesService;

    @PostMapping(value = "/save")
    public Mono<Application> saveApplication(@RequestBody ApplicationRequest request) {

        ApplicationRequest applicationRequest = requestMapper.apply(request);

        return Mono.subscriberContext()
                .flatMap(context -> feesService.calculateApplicationFees(applicationRequest)
                        .collectList())
                .map(feeItems -> applicationRequest.getFeeItems().addAll(feeItems))
                .flatMap(isRequestEnriched -> applicationService.saveApplication(applicationRequest)
                        .map(saveApplicationResponse -> {
                            Application application = new Application();
                            application.setLicenceId(saveApplicationResponse.getResponse().getLicenceNumber());
                            return application;
                        }))
                .onErrorMap(throwable -> new ApplicationException(String.format(SAVE_ERROR_MESSAGE,
                        request.getLicenceId()),
                        throwable, true, false))
                .log();
    }
}


@Service
@Slf4j
@RequiredArgsConstructor
public class ApplicationService extends ClientService{

     private final AuthenticationService authenticationService;  

         public Mono<SaveApplicationResponse> saveApplication(ApplicationRequest request) {

            return Mono.subscriberContext()
                .flatMap(context -> authenticationService.getAccessToken()
                        .flatMap(token -> post("/save",
                                request,
                                token.getAccessToken(),
                                context)
                                .bodyToMono(SaveApplicationResponse.class))
                        .log());
    }
}



@Service
@Slf4j
@RequiredArgsConstructor
public class FeesService extends ClientService{

     private final AuthenticationService authenticationService;  

        public Flux<FeeItem> calculateApplicationFees(ApplicationRequest request) {

        return Mono.subscriberContext()
                .flatMap(ctx -> authenticationService.getAccessToken()
                        .flatMap(token -> get("/fees", request, token.getAccessToken(), ctx)
                                .bodyToMono(FeeResponse.class))
                        .log())
                .flatMapMany(rsp -> Flux.fromIterable(rsp.getFeeItems()));
    }
}

如果我这样做就行不通了......意思是,请求永远不会丰富:



@RestController
@RequestMapping("/applications")
@Slf4j
@RequiredArgsConstructor
public class ApplicationController {

    private final ApplicationService applicationService;
    private final ApplicationRequestMapper requestMapper;

     @PostMapping(value = "/save")
        public Mono<Application> saveApplication(@RequestBody ApplicationRequest request) {
            return Mono.subscriberContext()
                    .flatMap(context -> applicationService.saveApplication(requestMapper.apply(request))
                            .map(saveApplicationResponse -> {
                                Application application = new Application();
                                application.setLicenceId(saveApplicationResponse.getResponse().getLicenceNumber());
                                return application;
                            }))
                    .onErrorMap(throwable -> new ApplicationException(String.format(SAVE_ERROR_MESSAGE,
                            request.getLicenceId()),
                            throwable, true, false))
                    .log();
        }

}

@Service
@Slf4j
@RequiredArgsConstructor
public class ApplicationService extends ClientService{

     private final AuthenticationService authenticationService;
     private final FeesService feesService;


         public Mono<SaveApplicationResponse> saveApplication(ApplicationRequest request) {

            return Mono.subscriberContext()
                    .flatMap(context -> feesService.calculateApplicationFees(request)
                            .collectList())
                    .map(feeItems -> request.getFeeItems().addAll(feeItems))
                    .subscriberContext()
                    .flatMap(context -> authenticationService.getAccessToken()
                            .flatMap(token -> post("/save",
                                    request,
                                    token.getAccessToken(),
                                    context)
                                    .bodyToMono(SaveApplicationResponse.class))
                            .log());
        }
}



@Service
@Slf4j
@RequiredArgsConstructor
public class FeesService extends ClientService{

     private final AuthenticationService authenticationService;  

        public Flux<FeeItem> calculateApplicationFees(ApplicationRequest request) {

        return Mono.subscriberContext()
                .flatMap(ctx -> authenticationService.getAccessToken()
                        .flatMap(token -> get("/fees", request, token.getAccessToken(), ctx)
                                .bodyToMono(FeeResponse.class))
                        .log())
                .flatMapMany(rsp -> Flux.fromIterable(rsp.getFeeItems()));
    }
}


0
投票

你的问题来自第二个.subscriberContext()。它是一个静态方法,它创建一个新的Mono,这意味着它永远不会执行之前的代码,这就是为什么request对象不会改变。

无论如何,你的代码很乱。让它更简单。至于我读你的代码,你根本不需要FluxfeesService.calculateApplicationFees(...)应该返回Mono<List<FeeItem>>。有太多不必要的.log()Mono.subscriberContext()。你甚至需要这里的背景吗?

© www.soinside.com 2019 - 2024. All rights reserved.