我有一个简单的用例,不知道该怎么做,因为我是新的春天webflux。
我正在使用spring boot webflux starters
。我需要调用2个端点。让我们说Endpoint1
和Endpoint2
。
每当Endpoint1
被请求命中时,我应首先使用相同的请求命中Endpoint2
并使用Endpoint2
的响应来丰富原始请求,然后再做一些事情。在做任何事情之前,需要使用Endpoint1
的响应来丰富Endpoint2
的请求对象。如何使用Spring webflux强制执行此订单?就我而言,原始请求对象在进一步使用之前不会被丰富。任何帮助都非常感谢!
仅供参考 - 使用Endpoint2
致电webclient
public Mono<Response1> endpoint1(Request1 request1){
Flux<Response2> reponse2 = webclient.getEndpoint2(request1); // Returns a Flux
//use the above reponse2 to enrich the request1
return webclient.getSomething(request1); //Returns Mono<Response1>
}
public Mono<ApplicationResponse> save(ApplicationRequest request) {
return Mono.subscriberContext().flatMap(ctx -> {
Mono blockingWrapper = Mono.fromCallable(() ->
service.getId(request)
.subscriberContext(ctx)
.subscribe(id -> request.setId(id))
).subscribeOn(Schedulers.elastic());
return blockingWrapper.flatMap(o -> authService.getAccessToken()
.flatMap(token -> post("/save", request,
token.getAccessToken(),
ctx)
.bodyToMono(ApplicationResponse.class))
.log());
});
}
如果你确定你有一个带有getEndpoint2(request1)的Flux,在这种情况下,你可以使用collectList():
return webclient.getEndpoint2(request1) // Flux<Response2>
.collectList() // Mono<List<Response2>>
.flatMap(list -> {
// ... should handle empty list if needed
finalRequest = createRequest(request1, list);
return webclient.getSomething(finalRequest); // Mono<Response1>
});
我看到一些有趣的事情发生了。如果我从Controller类中编排它,它会按预期工作,而如果我从我的Controller类调用一个服务来编排这个流程,它似乎没有按预期工作。只是想知道我错过了什么?或者这是它的工作方式?
这是工作代码:
@RestController
@RequestMapping("/applications")
@Slf4j
@RequiredArgsConstructor
public class ApplicationController {
private final ApplicationService applicationService;
private final ApplicationRequestMapper requestMapper;
private final FeesService feesService;
@PostMapping(value = "/save")
public Mono<Application> saveApplication(@RequestBody ApplicationRequest request) {
ApplicationRequest applicationRequest = requestMapper.apply(request);
return Mono.subscriberContext()
.flatMap(context -> feesService.calculateApplicationFees(applicationRequest)
.collectList())
.map(feeItems -> applicationRequest.getFeeItems().addAll(feeItems))
.flatMap(isRequestEnriched -> applicationService.saveApplication(applicationRequest)
.map(saveApplicationResponse -> {
Application application = new Application();
application.setLicenceId(saveApplicationResponse.getResponse().getLicenceNumber());
return application;
}))
.onErrorMap(throwable -> new ApplicationException(String.format(SAVE_ERROR_MESSAGE,
request.getLicenceId()),
throwable, true, false))
.log();
}
}
@Service
@Slf4j
@RequiredArgsConstructor
public class ApplicationService extends ClientService{
private final AuthenticationService authenticationService;
public Mono<SaveApplicationResponse> saveApplication(ApplicationRequest request) {
return Mono.subscriberContext()
.flatMap(context -> authenticationService.getAccessToken()
.flatMap(token -> post("/save",
request,
token.getAccessToken(),
context)
.bodyToMono(SaveApplicationResponse.class))
.log());
}
}
@Service
@Slf4j
@RequiredArgsConstructor
public class FeesService extends ClientService{
private final AuthenticationService authenticationService;
public Flux<FeeItem> calculateApplicationFees(ApplicationRequest request) {
return Mono.subscriberContext()
.flatMap(ctx -> authenticationService.getAccessToken()
.flatMap(token -> get("/fees", request, token.getAccessToken(), ctx)
.bodyToMono(FeeResponse.class))
.log())
.flatMapMany(rsp -> Flux.fromIterable(rsp.getFeeItems()));
}
}
如果我这样做就行不通了......意思是,请求永远不会丰富:
@RestController
@RequestMapping("/applications")
@Slf4j
@RequiredArgsConstructor
public class ApplicationController {
private final ApplicationService applicationService;
private final ApplicationRequestMapper requestMapper;
@PostMapping(value = "/save")
public Mono<Application> saveApplication(@RequestBody ApplicationRequest request) {
return Mono.subscriberContext()
.flatMap(context -> applicationService.saveApplication(requestMapper.apply(request))
.map(saveApplicationResponse -> {
Application application = new Application();
application.setLicenceId(saveApplicationResponse.getResponse().getLicenceNumber());
return application;
}))
.onErrorMap(throwable -> new ApplicationException(String.format(SAVE_ERROR_MESSAGE,
request.getLicenceId()),
throwable, true, false))
.log();
}
}
@Service
@Slf4j
@RequiredArgsConstructor
public class ApplicationService extends ClientService{
private final AuthenticationService authenticationService;
private final FeesService feesService;
public Mono<SaveApplicationResponse> saveApplication(ApplicationRequest request) {
return Mono.subscriberContext()
.flatMap(context -> feesService.calculateApplicationFees(request)
.collectList())
.map(feeItems -> request.getFeeItems().addAll(feeItems))
.subscriberContext()
.flatMap(context -> authenticationService.getAccessToken()
.flatMap(token -> post("/save",
request,
token.getAccessToken(),
context)
.bodyToMono(SaveApplicationResponse.class))
.log());
}
}
@Service
@Slf4j
@RequiredArgsConstructor
public class FeesService extends ClientService{
private final AuthenticationService authenticationService;
public Flux<FeeItem> calculateApplicationFees(ApplicationRequest request) {
return Mono.subscriberContext()
.flatMap(ctx -> authenticationService.getAccessToken()
.flatMap(token -> get("/fees", request, token.getAccessToken(), ctx)
.bodyToMono(FeeResponse.class))
.log())
.flatMapMany(rsp -> Flux.fromIterable(rsp.getFeeItems()));
}
}
你的问题来自第二个.subscriberContext()
。它是一个静态方法,它创建一个新的Mono
,这意味着它永远不会执行之前的代码,这就是为什么request
对象不会改变。
无论如何,你的代码很乱。让它更简单。至于我读你的代码,你根本不需要Flux
。 feesService.calculateApplicationFees(...)
应该返回Mono<List<FeeItem>>
。有太多不必要的.log()
或Mono.subscriberContext()
。你甚至需要这里的背景吗?