当 Mono.zip 中发生错误时,Spring WebFlux 返回响应,但继续处理它们

问题描述 投票:0回答:1

我有一个调用其他远程服务的 REST 服务(我们称之为“主服务”)。出于性能目的,当其中一个远程调用抛出错误时,我需要主服务立即应答;但我需要继续监听其他呼叫响应以记录它们或计算验证。

基本上我有类似“主要服务”的东西:

public ResponseEntity<SomeType> mainService() {
 Mono<A> remoteCallA = getRemoteCallA();
 Mono<B> remoteCallB = getRemoteCallB();

 SomeType result = Mono.zip(remoteCallA , remoteCallB)
  .doOnSuccess(...)
  .map(...)
  .block();

 return ResponseEntity.ok(response);
}

另一个服务调用远程 A 并执行一些操作并得到结果:

//TAKES 1 SECONDE TO RESPOND
public Mono<A> getRemoteCallA() {
 return client.get()
  ...
  .doOnSuccess(response ->
    //MANDATORY EXECUTION);
}

另一个调用远程 B 的服务抛出错误:

//ERROR TRHOWED IN 500 MS
public Mono<B> getRemoteCallB() {
 return client.get()
  ...
  .doOnError(exception ->
    //SOME LOGGGING
  );
}

我的问题是 call B 在调用 A 之前失败,我想立即返回 HTTP 响应。 但是,当调用 A 完成时,不会触发 doOnSuccess(也不会触发任何 doOnXXX 方法)。

我不确定,但我认为 zip 流被取消(用 log() 看到它),所以 A 事件没有被触发?

zipDelayError 不是解决方案,否则我必须等待 1 秒才能响应错误

java error-handling spring-webflux blocking
1个回答
0
投票

看起来有效的方法:

  • 缓存两者
    Publisher
    ,以便它们成为热源
  • 复制
    zip
    的订阅,以便您可以将其返回给客户端并处理本地代码中的错误
  • 使用
    onErrorXXX
  • 处理本地代码上的错误
  • 使用您希望的任何转换在本地代码上成功处理
  • 处理响应代码上的错误信号
  @Test
  void so_78147735() throws InterruptedException {

    var monoA = Mono.fromSupplier(() -> delay(1000L, false, "A")).subscribeOn(Schedulers.boundedElastic());
    var monoB = Mono.fromSupplier(() -> delay(500L, true, "B")).subscribeOn(Schedulers.boundedElastic());

    var cachedA = monoA.cache();
    var cachedB = monoB.cache();

    var response = Mono.zip(cachedA, cachedB).subscribeOn(Schedulers.boundedElastic());
    var local = Mono.zip(cachedA.map(resultOfA -> resultOfA.replace("A", "SUCCESS")),
                         cachedB.onErrorReturn("FAIL"))
                    .subscribeOn(Schedulers.boundedElastic());

    response.subscribe(objects -> System.out.println("response mono: " + objects.getT1() + " " + objects.getT2()));
    local.subscribe(objects -> System.out.println("local mono: " + objects.getT1() + " " + objects.getT2()));

    Thread.sleep(1000L);

  }

  private String delay(Long delay, Boolean error, String id) {
    try {
      Thread.sleep(delay);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }

    if (error) {
      throw new IllegalArgumentException(id);
    }

    System.out.println("delay method: " + id);

    return id;
  }
© www.soinside.com 2019 - 2024. All rights reserved.