我有一个调用其他远程服务的 REST 服务(我们称之为“主服务”)。出于性能目的,当其中一个远程调用抛出错误时,我需要主服务立即应答;但我需要继续监听其他呼叫响应以记录它们或计算验证。
基本上我有类似“主要服务”的东西:
public ResponseEntity<SomeType> mainService() {
Mono<A> remoteCallA = getRemoteCallA();
Mono<B> remoteCallB = getRemoteCallB();
SomeType result = Mono.zip(remoteCallA , remoteCallB)
.doOnSuccess(...)
.map(...)
.block();
return ResponseEntity.ok(response);
}
另一个服务调用远程 A 并执行一些操作并得到结果:
//TAKES 1 SECONDE TO RESPOND
public Mono<A> getRemoteCallA() {
return client.get()
...
.doOnSuccess(response ->
//MANDATORY EXECUTION);
}
另一个调用远程 B 的服务抛出错误:
//ERROR TRHOWED IN 500 MS
public Mono<B> getRemoteCallB() {
return client.get()
...
.doOnError(exception ->
//SOME LOGGGING
);
}
我的问题是 call B 在调用 A 之前失败,我想立即返回 HTTP 响应。 但是,当调用 A 完成时,不会触发 doOnSuccess(也不会触发任何 doOnXXX 方法)。
我不确定,但我认为 zip 流被取消(用 log() 看到它),所以 A 事件没有被触发?
zipDelayError 不是解决方案,否则我必须等待 1 秒才能响应错误
看起来有效的方法:
Publisher
,以便它们成为热源zip
的订阅,以便您可以将其返回给客户端并处理本地代码中的错误onErrorXXX
@Test
void so_78147735() throws InterruptedException {
var monoA = Mono.fromSupplier(() -> delay(1000L, false, "A")).subscribeOn(Schedulers.boundedElastic());
var monoB = Mono.fromSupplier(() -> delay(500L, true, "B")).subscribeOn(Schedulers.boundedElastic());
var cachedA = monoA.cache();
var cachedB = monoB.cache();
var response = Mono.zip(cachedA, cachedB).subscribeOn(Schedulers.boundedElastic());
var local = Mono.zip(cachedA.map(resultOfA -> resultOfA.replace("A", "SUCCESS")),
cachedB.onErrorReturn("FAIL"))
.subscribeOn(Schedulers.boundedElastic());
response.subscribe(objects -> System.out.println("response mono: " + objects.getT1() + " " + objects.getT2()));
local.subscribe(objects -> System.out.println("local mono: " + objects.getT1() + " " + objects.getT2()));
Thread.sleep(1000L);
}
private String delay(Long delay, Boolean error, String id) {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (error) {
throw new IllegalArgumentException(id);
}
System.out.println("delay method: " + id);
return id;
}