我在 Spring WebFlux(2.3.5) 和 java 11 中编写 API,我需要创建一个具有三个顺序函数调用的高效服务层。在第一个函数中,有五个不同返回类型的并行调用:
所有网络调用均使用 Webclient 进行。如果网络调用返回Class3为空,我们需要抛出400 BadRequest。该机器有3个核心和3GB内存。 API 应该具有低延迟和高吞吐量。
这是我的实现:
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.web.server.ResponseStatusException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@Service
public class MyService {
public Mono<FinalResult> performOperations() {
return Mono.zip(
getFluxOfClass1().collectList().subscribeOn(Schedulers.boundedElastic()),
getFluxOfClass2().collectList().subscribeOn(Schedulers.boundedElastic()),
getClass3().subscribeOn(Schedulers.boundedElastic()),
getClass4().subscribeOn(Schedulers.boundedElastic()),
getClass5().subscribeOn(Schedulers.boundedElastic()))
.flatMap(tuple -> {
List<Class1> class1List = tuple.getT1();
List<Class2> class2List = tuple.getT2();
Class3 class3 = tuple.getT3();
Class4 class4 = tuple.getT4();
Class5 class5 = tuple.getT5();
if (class3 == null) {
return Mono.error(new ResponseStatusException(HttpStatus.BAD_REQUEST, "Class3 is empty"));
}
return function2(class1List, class2List, class3, class4, class5)
.flatMap(result -> function3(result));
});
}
// other functions
}
Mono.zip 中的所有函数都在同一个 boundedElastic 线程中运行。我尝试使用 Schedulers.boundedElastic() 进行 subscribeOn、publishOn。如何让它们在不同的线程中运行?任何帮助,将不胜感激。 谢谢。