我想要达到的目标:
从积极的 Flux 并行发送尽可能多的 http 请求到非常可靠的第三方服务
背景:
第三方服务非常可靠,可以承受非常多的请求。到目前为止,我是该第三方服务的唯一客户。请我尽可能地打击第三方服务器。我无法控制的第三方服务不提供任何批量/列表 API。我只能一个一个发送请求。在他们这边,每个请求都需要固定的一秒钟来处理。
我尝试了什么:
这是我的 http 客户端的配置,以及发送 http 请求的逻辑(希望尽可能多的请求,越快越好)
客户
@Bean
public WebClient webClient(WebClient.Builder builder) {
final var clientConnector = new ClientConnector();
final var httpClient = new HttpClient(new HttpClientTransportDynamic(clientConnector));
return builder
.baseUrl("http://some-host:8080/api/path")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.clientConnector(new JettyClientHttpConnector(httpClient))
.build();
}
通量:
// some very fast flux
Flux<String> inputFlux = Flux.interval(Duration.ofMillis(1)).map(i -> "transform to request payload number " + i);
//send as many requests as fast as possible, the default flatMap number should be 256
Flux<String> resultFlux = inputFlux.flatMap(oneInput -> webClient.post().bodyValue(oneInput).retrieve().bodyToMono(String.class));
//doing something with the result
return resultFlux.map(oneResult -> doSomething(oneResult));
使用这个,我问了第三方服务,他们给了我一个数字 N,我每秒的请求数。
首先观察,flatMap 的默认并发数应该是 256。并且由于第三方需要一秒钟来处理请求,所以我预计每秒 256 个请求的速率 N。
然而,我还差得远呢。第三方服务告诉我每秒有 16 个左右的请求
额外尝试:
因为我想扩展发送到第三方服务器的请求数量,我正在调整并发数如下:
尝试编号 1:
// some very fast flux
Flux<String> inputFlux = Flux.interval(Duration.ofMillis(1)).map(i -> "transform to request payload number " + i);
//BUMP to 2048, 10x increase
Flux<String> resultFlux = inputFlux.flatMap(oneInput -> webClient.post().bodyValue(oneInput).retrieve().bodyToMono(String.class), 2048);
//doing something with the result
return resultFlux.map(oneResult -> doSomething(oneResult));
在调整 flatMap 的并发参数时,我预计发送的请求数会更高,这里是 10*N。但一点也不,我仍然是每秒 16 个左右的请求。
问题:
如果 flatMap 是实现该目标的最佳方法(请告诉我是否应该更改为其他方法)
如果是,为什么不缩放?为什么它不起作用?
如果你需要一个非常快的
Flux
无限且尽可能快地生成值,你可以使用
Flux<Integer> inputFlux = Mono.just(1).repeat()
如果你需要增加值,你可以使用
Flux<Integer> inputFlux = Flux.generate(
() -> 1, (s, sink) -> {
sink.next(s);
return s + 1;
})
这两者都将同步运行并尽可能快地产生价值。它们可以被想象成一个紧密的
for(;;)
循环,所以适用于紧密 for
循环的所有陷阱也适用于此,所以要小心。
您无法向 3rd 方服务生成更多请求的最可能原因是,一旦您
flatMap
到 WebClient
,实际请求处理由 jetty-reactive-httpclient
库的调度策略控制。我只是在猜测,但我假设您需要调整 HttpClient
库(由 jetty-reactive-httpclient
内部使用)的并发连接数,可能是 IO 线程,保持活动状态,流水线等。此外,第 3 方服务可能正在使用某种形式的不太明显的速率限制(比如不允许保持活动状态,不支持 HTTP 管道)。
所以从本质上讲,更改
flatMap()
运算符的并发和预取设置不会自动转化为执行实际 IO 工作的 Jetty HttpClient
库的更高吞吐量配置。你可以在这里阅读 Jetty 的HttpClient
https://www.eclipse.org/jetty/documentation/jetty-11/programming-guide/index.html.
此外,在分析时,你也应该观察你机器上的流量(打开了多少套接字,来自你机器的流量,套接字的状态等)因为你的请求仍然可能被一些中间人减慢是否是一些反向代理或路由器、交换机,甚至是您机器上的操作系统或防病毒软件。