我正在开发一个基于Webflux的Spring boot应用程序(Spring boot 3.1.5和java 17)。 它将请求从前端应用程序传输到另一个外部应用程序(下面称为外部服务器)。 前端调用 My Controller.java,My Controller.java 调用我的 Service.java,后者调用我的 ClientAPI.java,后者调用外部服务器。 我还从另一个第二个服务器调用另一个服务(下面称为 DoneService 的服务)。 因此,我是 2 个外部服务器的消费者和前端应用程序的生产者。 为了执行我的测试,我用邮递员替换了前端应用程序。 因此,我的 Spring boot 应用程序是前端的生产者,但同时也是外部应用程序的消费者。
我的问题如下: 在外部服务器端,我可以在其日志中看到其服务被我的 springboot 应用程序调用了 4 次,而不是一次。 在我这边,日志表明(请参阅下面的我的日志):
为了检查我是否调用了外部应用程序 4 次,我使用 postman 来直接调用外部服务器服务。并且用邮递员正确调用一次。
我的Controller.java:
@Slf4j
@RestController
@RequestMapping("/v1")
@CrossOrigin(origins = "http://localhost:4200")
public class Controller {
private static final String PATH_SERVICE = "/pathservice";
@Autowired
Service service;
@PostMapping(path = PATH_SERVICE, consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<TheResponseClass> mySpringbootService(@RequestHeader("requestId") String requestId,
@RequestBody @Valid InputData inputData) {
Mono<TheResponseClass> response = service.save(requestId, inputData);
response.subscribe(
status -> log
.debug("Controller : response status = " + status.getResponse().getStatus()),
error -> log.error(
"Controller : The following error happened!"
+ error));
return response;
}
}
我的服务.java:
@Slf4j
@Service
public class Service {
@Autowired
ClientAPI clientAPI;
@Autowired
BuilRequestBody builRequestBody;
@Autowired
BuildHeaderRequest buildHeaderRequest;
@Autowired
DoneService doneService;
@Autowired
Config config;
public Mono<TheResponseClass> save(String requestId, @Valid InputData inputData) {
Mono<TheResponseClass> response = null;
if (inputData.getClassInfo() != null) {
response = clientAPI.sendRequest(
builRequestBody.buildRequest(inputData),
buildHeaderRequest.computeHeader(requestId, config.getHeaderID()));
response.subscribe(responseResult -> {
Status resultStatus = responseResult.getResponse().getStatus();
log.debug("Service : response status = " + resultStatus);
if (resultStatus == Status.SUCCESS){
//Call another service that uses WebClient implementation
doneService.notify();
}
}, error -> {
log.error("Service : error happened !"+ error);
});
} else {
TheResponseClass errorResponse = new TheResponseClass();
errorResponse.setName("name");
errorResponse.setOperation("operation");
response = Mono.just(errorResponse);
}
return response;
}
}
我的ClientAPI.java调用外部服务器:
@Slf4j
@Service
public class ClientAPI {
private WebClient webClient;
private String serverUrl;
// Retrieve CustomWebClient built with htts and token management
@Autowired
public ClientAPI(@Qualifier("CustomWebClient") WebClient webClient) {
this.serverUrl = "https://localhost:8456/myServer";
this.webClient = webClient;
}
public Mono<TheResponseClass> sendRequest(Request bodyRequest, HeaderParameters headerParameters) {
//log.debug("ClientAPI Header : REQUEST_ID = "+headerParameters.getRequestID().toString());
// other log.debug for multiple headers data...
Mono<TheResponseClass> result = this.webClient.post().uri(this.serverUrl)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.header(HeaderParameters.REQUEST_ID, headerParameters.getRequestID())
.body(Mono.just(bodyRequest), Request.class)
.retrieve()
.bodyToMono(TheResponseClass.class);
result.subscribe(responseResult -> {
ObjectMapper mapper = new ObjectMapper();
try {
String jsonOutput = mapper.writeValueAsString(responseResult);
log.info("ClientAPI : response result = "+jsonOutput);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}, error -> {
log.error("ClientAPI : The following error happened on response getStatus!", error);
});
return result;
}
}
我的 DoneService.java :
@Slf4j
@Service
public class DoneService {
@Autowired
DoneClientAPI doneClientAPI;
public Mono<DoneData> done(DoneRequest doneRequest) {
return doneClientAPI.sendRequest(doneRequest).flatMap(s -> {
.....
return Mono.just(s.getDoneDataList().get(0));
});
}
}
我的 DoneClientAPI.java :
@Slf4j
@Service
public class DoneClientAPI {
private WebClient webClient;
private static final String NOTIFY_URI = "/Done";
private String notificationUrl;
public Mono<DoneResponseList> sendRequest(DoneRequest doneRequest) {
return this.webClient.post().uri(this.notificationUrl)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_FORM_URLENCODED_VALUE)
.body(BodyInserters
.fromFormData(DoneRequest.SYSTEM_ISSUER, DoneRequest.SYSTEM_ISSUER)
.with(DoneRequest.DATE_REQUEST, doneRequest.getDate_request()))
.retrieve().bodyToMono(DoneResponseList.class);
}
}
我的一些简化数据模型:
来自前端的输入数据:
public class InputData {
private String Data;
private ClassInfo ClassInfo;
public void setData(String Data) {
this.Data = Data;
}
public void setClassInfo(ClassInfo ClassInfo) {
this.ClassInfo = ClassInfo;
}
public String getData() {
return Data;
}
public ClassInfo getClassInfo() {
return ClassInfo;
}
}
向外部服务器请求数据:
public class Request {
private String name;
private String operation;
public Request() {
}
public Request name(String name) {
this.name = name;
return this;
}
public String getName() {
return name;
}
public void setName(String string) {
this.name = string;
}
public Request operation(String operation) {
this.operation = operation;
return this;
}
public String getOperation() {
return operation;
}
public void setOperation(String operation) {
this.operation = operation;
}
}
我的日志:
12:05:38.266 ERROR i.z.b.t.l.b.s.Service - Service : error happened !org.springframework.web.reactive.function.client.WebClientResponseException$BadRequest: 400 Bad Request from POST https://externalServerIP:8456/myServer
12:05:38.401 ERROR i.z.b.t.l.w.controller.Controller - Controller : The following error happened on getStatus response!org.springframework.web.reactive.function.client.WebClientResponseException$BadRequest: 400 Bad Request from POST https://externalServerIP:8486/myServer
12:05:38.676 ERROR o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler - [506b1d63-1] 500 Server Error for HTTP POST "/v1/biometricprofileEnrol"
org.springframework.web.reactive.function.client.WebClientResponseException$BadRequest: 400 Bad Request from POST https://externalServerIP:8456/myServer
at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:307)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ 400 BAD_REQUEST from POST https://externalServerIP:8456/myServer [DefaultWebClient]
*__checkpoint ⇢ Handler package.webservice.controller.Controller#mySpringbootService(String, InputData) [DispatcherHandler]
*__checkpoint ⇢ org.springframework.security.web.server.authorization.AuthorizationWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.authorization.ExceptionTranslationWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.authentication.logout.LogoutWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.savedrequest.ServerRequestCacheWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.context.SecurityContextServerWebExchangeWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.context.ReactorContextWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.header.HttpHeaderWriterWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.config.web.server.ServerHttpSecurity$ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]
*__checkpoint ⇢ HTTP POST "myServer" [ExceptionHandlingWebHandler]
Original Stack Trace:
at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:307)
at org.springframework.web.reactive.function.client.DefaultClientResponse.lambda$createException$1(DefaultClientResponse.java:214)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
at reactor.core.publisher.FluxOnErrorReturn$ReturnSubscriber.onNext(FluxOnErrorReturn.java:162)
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:122)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2071)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:413)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:431)
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:485)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:712)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1466)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1329)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1378)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
12:05:39.041 INFO p.ClientAPI - ClientAPI : response result = {"name":"yourname","operation":"yourOperation","response":{"status":"SUCCESS","error_Code":null,"system_message":null}}
感谢 Toerktumlare 和其他输入,我删除了重复调用外部服务的订阅调用。 因为我已经通过控制器调用外部 API 进行了隐式订阅。 我使用 flatMap 来代替来操作接收到的数据。 我使用 subscribe 来调用 DoneService,因为我直接需要在我的方法之一中调用它(不是通过控制器直接从前端发出的请求)
因此以 Controller.java 为例:
@Slf4j
@RestController
@RequestMapping("/v1")
@CrossOrigin(origins = "http://localhost:4200")
public class Controller {
private static final String PATH_SERVICE = "/pathservice";
@Autowired
Service service;
@PostMapping(path = PATH_SERVICE, consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<TheResponseClass> mySpringbootService(@RequestHeader("requestId") String requestId,
@RequestBody @Valid InputData inputData) {
return service.save(requestId, inputData);
}
}
以及从 Service.java 调用我的 DoneService :
@Slf4j
@Service
public class Service {
@Autowired
ClientAPI clientAPI;
@Autowired
BuilRequestBody builRequestBody;
@Autowired
BuildHeaderRequest buildHeaderRequest;
@Autowired
DoneService doneService;
public Mono<TheResponseClass> save(String requestId, @Valid InputData inputData) {
Mono<TheResponseClass> response = null;
// builRequestBody and buildHeaderRequest are used to convert the inputData into the
// dedicated format for the external server API
return clientAPI.sendRequest(
builRequestBody.buildRequest(inputData),
buildHeaderRequest.computeHeader(requestId, config.getHeaderID())).flatMap(responseResult -> {
Status resultStatus = responseResult.getResponse().getStatus();
if (resultStatus == Status.SUCCESS) {
....
//Call another service that uses WebClient implementation
doneService.notify().subscribe();
}
return Mono.just(responseResult);
});
}
}