使用 WebClient 意外自动多次调用外部服务器

问题描述 投票:0回答:1

我正在开发一个基于Webflux的Spring boot应用程序(Spring boot 3.1.5和java 17)。 它将请求从前端应用程序传输到另一个外部应用程序(下面称为外部服务器)。 前端调用 My Controller.java,My Controller.java 调用我的 Service.java,后者调用我的 ClientAPI.java,后者调用外部服务器。 我还从另一个第二个服务器调用另一个服务(下面称为 DoneService 的服务)。 因此,我是 2 个外部服务器的消费者和前端应用程序的生产者。 为了执行我的测试,我用邮递员替换了前端应用程序。 因此,我的 Spring boot 应用程序是前端的生产者,但同时也是外部应用程序的消费者。

我的问题如下: 在外部服务器端,我可以在其日志中看到其服务被我的 springboot 应用程序调用了 4 次,而不是一次。 在我这边,日志表明(请参阅下面的我的日志):

  • 我只调用外部服务一次
  • 我的 ClientAPI.java 的响应状态是成功。我发送到外部服务器的输入被正确保存到外部服务器数据库中。服务器检查数据是否已插入其数据库,在这种情况下,它会拒绝请求,告知数据已注册。这是我再次运行我的请求时正确得到的结果。
  • 但是我的 Service.java 和 Controller.java 上的响应失败,出现 400 错误代码错误请求

为了检查我是否调用了外部应用程序 4 次,我使用 postman 来直接调用外部服务器服务。并且用邮递员正确调用一次。

我的Controller.java:

@Slf4j
@RestController
@RequestMapping("/v1")
@CrossOrigin(origins = "http://localhost:4200")
public class Controller {
    private static final String PATH_SERVICE = "/pathservice";

    @Autowired
    Service service;

    @PostMapping(path = PATH_SERVICE, consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
    public Mono<TheResponseClass> mySpringbootService(@RequestHeader("requestId") String requestId,
            @RequestBody @Valid InputData inputData) {
        Mono<TheResponseClass> response = service.save(requestId, inputData);

        response.subscribe(
                status -> log
                        .debug("Controller :  response status = " + status.getResponse().getStatus()),
                error -> log.error(
                        "Controller : The following error happened!"
                                + error));

        return response;

    }
    
}

我的服务.java:

@Slf4j
@Service
public class Service {

    @Autowired
    ClientAPI clientAPI;

    @Autowired
    BuilRequestBody builRequestBody;

    @Autowired
    BuildHeaderRequest buildHeaderRequest;
    
    @Autowired
    DoneService doneService;

    @Autowired
    Config config;

    public Mono<TheResponseClass> save(String requestId, @Valid InputData inputData) {

        Mono<TheResponseClass> response = null;

        if (inputData.getClassInfo() != null) {

            response = clientAPI.sendRequest(
                    builRequestBody.buildRequest(inputData),
                    buildHeaderRequest.computeHeader(requestId, config.getHeaderID()));

            response.subscribe(responseResult -> {
                Status resultStatus = responseResult.getResponse().getStatus();
                log.debug("Service : response status = " + resultStatus);

                if (resultStatus == Status.SUCCESS){
                        //Call another service that uses WebClient implementation
                        doneService.notify();
                }

            }, error -> {
                log.error("Service : error happened !"+ error);
            });
        } else {
            TheResponseClass errorResponse = new TheResponseClass();
            errorResponse.setName("name");
            errorResponse.setOperation("operation");
            response = Mono.just(errorResponse);
        }
        return response;

    }

}

我的ClientAPI.java调用外部服务器:

@Slf4j
@Service
public class ClientAPI {
    private WebClient webClient;
    private String serverUrl;

    // Retrieve CustomWebClient built with htts and token management
    @Autowired
    public ClientAPI(@Qualifier("CustomWebClient") WebClient webClient) {
        this.serverUrl = "https://localhost:8456/myServer";
        this.webClient = webClient;
    }

    public Mono<TheResponseClass> sendRequest(Request bodyRequest, HeaderParameters headerParameters) {
        
    //log.debug("ClientAPI Header : REQUEST_ID = "+headerParameters.getRequestID().toString());
   // other log.debug for multiple headers data...
    
        Mono<TheResponseClass> result = this.webClient.post().uri(this.serverUrl)
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .header(HeaderParameters.REQUEST_ID, headerParameters.getRequestID())
                .body(Mono.just(bodyRequest), Request.class)
                .retrieve()
                .bodyToMono(TheResponseClass.class);

        result.subscribe(responseResult -> {
            ObjectMapper mapper = new ObjectMapper();
            try {
                String jsonOutput = mapper.writeValueAsString(responseResult);
                log.info("ClientAPI : response result = "+jsonOutput);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }

        }, error -> {
            log.error("ClientAPI : The following error happened on response getStatus!", error);
        });
        return result;
    }
}

我的 DoneService.java :

@Slf4j
@Service
public class DoneService {

    @Autowired
    DoneClientAPI doneClientAPI;

    public Mono<DoneData> done(DoneRequest doneRequest) {
        
        return doneClientAPI.sendRequest(doneRequest).flatMap(s -> {
            .....
            return Mono.just(s.getDoneDataList().get(0));
        });

    }

}

我的 DoneClientAPI.java :

@Slf4j
@Service
public class DoneClientAPI {
    private WebClient webClient;
    private static final String NOTIFY_URI = "/Done";
    private String notificationUrl;


    public Mono<DoneResponseList> sendRequest(DoneRequest doneRequest) {

        return this.webClient.post().uri(this.notificationUrl)
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_FORM_URLENCODED_VALUE)
                .body(BodyInserters
                        .fromFormData(DoneRequest.SYSTEM_ISSUER, DoneRequest.SYSTEM_ISSUER)
                        .with(DoneRequest.DATE_REQUEST, doneRequest.getDate_request()))
                .retrieve().bodyToMono(DoneResponseList.class);

    }
}

我的一些简化数据模型:

来自前端的输入数据:

public class InputData {
    private String Data;
    private ClassInfo ClassInfo;
    
    public void setData(String Data) {
        this.Data = Data;
    }

    public void setClassInfo(ClassInfo ClassInfo) {
        this.ClassInfo = ClassInfo;
    }

    public String getData() {
        return Data;
    }

    public ClassInfo getClassInfo() {
        return ClassInfo;
    }
}

向外部服务器请求数据:

public class Request {

  private String name;
  private String operation;

   public Request() {
  }

  public Request name(String name) {
    
    this.name = name;
    return this;
  }
  
  public String getName() {
    return name;
  }

  public void setName(String string) {
    this.name = string;
  }


  public Request operation(String operation) {
    
    this.operation = operation;
    return this;
  }

  public String getOperation() {
    return operation;
  }

  public void setOperation(String operation) {
    this.operation = operation;
  }

}

我的日志:

12:05:38.266 ERROR i.z.b.t.l.b.s.Service - Service : error happened !org.springframework.web.reactive.function.client.WebClientResponseException$BadRequest: 400 Bad Request from POST https://externalServerIP:8456/myServer
12:05:38.401 ERROR i.z.b.t.l.w.controller.Controller - Controller :  The following error happened on getStatus response!org.springframework.web.reactive.function.client.WebClientResponseException$BadRequest: 400 Bad Request from POST https://externalServerIP:8486/myServer
12:05:38.676 ERROR o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler - [506b1d63-1]  500 Server Error for HTTP POST "/v1/biometricprofileEnrol"
org.springframework.web.reactive.function.client.WebClientResponseException$BadRequest: 400 Bad Request from POST https://externalServerIP:8456/myServer
    at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:307)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    *__checkpoint ⇢ 400 BAD_REQUEST from POST https://externalServerIP:8456/myServer [DefaultWebClient]
    *__checkpoint ⇢ Handler package.webservice.controller.Controller#mySpringbootService(String, InputData) [DispatcherHandler]
    *__checkpoint ⇢ org.springframework.security.web.server.authorization.AuthorizationWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.authorization.ExceptionTranslationWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.authentication.logout.LogoutWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.savedrequest.ServerRequestCacheWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.context.SecurityContextServerWebExchangeWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.context.ReactorContextWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.header.HttpHeaderWriterWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.config.web.server.ServerHttpSecurity$ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]
    *__checkpoint ⇢ HTTP POST "myServer" [ExceptionHandlingWebHandler]
Original Stack Trace:
        at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:307)
        at org.springframework.web.reactive.function.client.DefaultClientResponse.lambda$createException$1(DefaultClientResponse.java:214)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
        at reactor.core.publisher.FluxOnErrorReturn$ReturnSubscriber.onNext(FluxOnErrorReturn.java:162)
        at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:122)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
        at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2071)
        at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
        at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:413)
        at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:431)
        at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:485)
        at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:712)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1466)
        at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1329)
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1378)
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:833)
12:05:39.041 INFO  p.ClientAPI - ClientAPI : response result = {"name":"yourname","operation":"yourOperation","response":{"status":"SUCCESS","error_Code":null,"system_message":null}}
java spring-webflux
1个回答
0
投票

感谢 Toerktumlare 和其他输入,我删除了重复调用外部服务的订阅调用。 因为我已经通过控制器调用外部 API 进行了隐式订阅。 我使用 flatMap 来代替来操作接收到的数据。 我使用 subscribe 来调用 DoneService,因为我直接需要在我的方法之一中调用它(不是通过控制器直接从前端发出的请求)

因此以 Controller.java 为例:

@Slf4j
@RestController
@RequestMapping("/v1")
@CrossOrigin(origins = "http://localhost:4200")
public class Controller {
    private static final String PATH_SERVICE = "/pathservice";

    @Autowired
    Service service;

    @PostMapping(path = PATH_SERVICE, consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
    public Mono<TheResponseClass> mySpringbootService(@RequestHeader("requestId") String requestId,
            @RequestBody @Valid InputData inputData) {
        return service.save(requestId, inputData);

    }
    
}

以及从 Service.java 调用我的 DoneService :

@Slf4j
@Service
public class Service {

    @Autowired
    ClientAPI clientAPI;

    @Autowired
    BuilRequestBody builRequestBody;

    @Autowired
    BuildHeaderRequest buildHeaderRequest;

    @Autowired
    DoneService doneService;


    public Mono<TheResponseClass> save(String requestId, @Valid InputData inputData) {
        Mono<TheResponseClass> response = null;
            // builRequestBody and buildHeaderRequest are used to convert the inputData into the 
            // dedicated format for the external server API 
            return clientAPI.sendRequest(
                    builRequestBody.buildRequest(inputData),
                    buildHeaderRequest.computeHeader(requestId, config.getHeaderID())).flatMap(responseResult -> {
                    Status resultStatus = responseResult.getResponse().getStatus();
                    if (resultStatus == Status.SUCCESS) {
                        ....
                        //Call another service that uses WebClient implementation
                        doneService.notify().subscribe();

                    }
                    return Mono.just(responseResult);
                });
    
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.