我需要通过基于Spring的Rest Service API的非阻塞io并行调用两个下游系统。但是第一个下游系统容量是一次10个请求,第二个下游系统容量是100个。
第一个下游系统输出到第二个下游系统,因此我可以向第二个系统提出更并行的请求,以加快处理速度。
第二个下游系统响应非常大,因此无法将所有响应具体存储在内存中,因此立即想将响应返回给客户端。
前工作流程:
示例代码:
@GetMapping(path = "/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> getstream() {
ExecutorService executor = Executors.newFixedThreadPool(10);
List<CompletableFuture> list = new ArrayList<>();
AtomicInteger ai = new AtomicInteger(1);
RestTemplate restTemplate = new RestTemplate();
for (int i = 0; i < 100; i++) {
CompletableFuture<Object> cff = CompletableFuture.supplyAsync(
() -> ai.getAndAdd(1) + " first downstream web service " +
restTemplate.getForObject("http://dummy.restapiexample.com/api/v1/employee/" + ai.get(), String.class)
).thenApplyAsync(v -> {
Random r = new Random();
Integer in = r.nextInt(1000);
return v + " second downstream web service " + in + " " + restTemplate.getForObject("http://dummy.restapiexample.com/api/v1/employee/" + ai.get() + 1, String.class) + " \n";
}, executor);
list.add(cff);
}
return Flux.fromStream(list.stream().map(m -> {
try {
return m.get().toString();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return "";
})
);
}
此代码仅在我得到响应后才在前五个线程中起作用,所有线程均已完成该过程。但是,一旦我从第二个下游系统获得响应,就需要立即对客户端做出响应。
注意:上面的代码未使用二级线程池实现。
谢谢你。
如果您使用Spring-Webflux构建非阻塞系统,最好在示例中利用WebClient的功能。我创建了一个简单的测试应用程序,下面的代码段对我有用:
private final WebClient w = WebClient.create("http://localhost:8080/call"); // web client for external system
@GetMapping(path = "/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<MyClass> getstream() {
return Flux
.range(0, 100) // prepare initial 100 requests
.window(10) // combine elements in batch of 10 (probably buffer will fit better, have a look)
// .delayElements(Duration.ofSeconds(5)) for testing purpose you can use this function as well
.doOnNext(flow -> log.info("Batch of 10 is ready")) // double check tells that batch is ready
.flatMap(flow -> flow
// perform an external async call for each element in batch of 10
// they will be executed sequentially but there will not be any performance issues because
// calls are async. If you wish you can add .parallel() to the flow to make it parallel
.flatMap(element -> w.get().exchange())
.map(r -> r.bodyToMono(MyClass.class))
)
// subscribe to each response and throw received element further to the stream
.flatMap(response -> Mono.create(s -> response.subscribe(s::success)))
.window(1000) // batch of 1000 is ready
.flatMap(flow -> flow
.flatMap(element -> w.get().exchange())
.map(r -> r.bodyToMono(MyClass.class))
)
.flatMap(response -> Mono.create(s -> response.subscribe(s::success)));
}
public static class MyClass {
public Integer i;
}