我正在努力优化生成令牌流的流程,但不确定如何开始实施更有效的方法。
下面,我描述了当前的方法和我的改进想法,我正在寻找一个简单的示例来说明如何使用 Spring Boot 和 Flux 来实现建议的优化。
目前的做法:
收到用户查询。
调用服务器1:根据查询获取整数列表。如果列表有效且非空,请继续下一步。
调用服务器 2:将用户查询和列表从服务器 1 传递到服务器 2。响应是令牌流。
将数据发送到前端:从服务器 2 接收到的令牌流,最后,整数列表被发送到前端。
如果第一次调用结果为 null 或空列表,则不会进行第二次调用。
建议的优化:
我想并行调用Server 1和Server 2以提高效率:
并行呼叫:同时向服务器1和服务器2发起呼叫。
缓冲:将来自服务器 2 的令牌流临时存储在缓冲区中。
处理服务器 1 响应:
a) 如果返回索引:将缓冲的令牌发送到前端,然后继续将任何新令牌直接流式传输到前端。
b) 如果为 null 或为空:终止服务器 2 调用并通知前端没有可用信息。
第一种方法是在 Spring Boot 中使用 Flux 实现的。但是,我不确定如何实现并行方法的缓冲和条件逻辑。
如何使用 Flux 在 Spring Boot 中高效地实现这种并行处理模型?是否有特定的模式或 Flux 方法可以帮助根据第一次调用的结果管理这些并发调用和条件响应?
根据预期的返回类型(单个响应与流),使用
Mono.zip
或 Flux.zip
同时发起对服务器 1 和服务器 2 的调用。此方法同步两个操作,允许您一起处理它们的结果,这对于优化响应时间至关重要。
要管理这些操作的异步性质,请考虑缓冲来自服务器 2 的响应。您可以在 Flux 中使用
buffer
或 cache
运算符,它会在您等待服务器 1 的响应时临时存储发出的令牌。这是对于管理数据流而不丢失任何临时令牌特别有用。
服务器 1 的响应决定后续步骤:
takeUntilOther
方法来停止或忽略来自服务器 2 的令牌流。此方法允许您根据触发器停止流,在本例中为来自服务器 1 的结果。下面是如何在 Spring Boot 中实现此功能的简化示例:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public void generateTokens() {
Mono<List<Integer>> server1Response = queryServer1();
Flux<String> server2Tokens = queryServer2();
Flux<String> bufferedServer2Tokens = server2Tokens.cache();
Mono.zip(server1Response, bufferedServer2Tokens.collectList())
.flatMap(tuple -> {
List<Integer> server1Result = tuple.getT1();
List<String> tokens = tuple.getT2();
if (server1Result == null || server1Result.isEmpty()) {
return Mono.empty(); // No further action if Server 1's response is empty
} else {
return Flux.concat(Flux.fromIterable(tokens), bufferedServer2Tokens);
}
})
.subscribe(this::sendToFrontend, this::handleError);
}
private Mono<List<Integer>> queryServer1() {
// Placeholder for server call implementation
}
private Flux<String> queryServer2() {
// Placeholder for server call implementation
}
private void sendToFrontend(String token) {
// Logic to send data to the frontend
}
private void handleError(Throwable error) {
// Error handling logic
}
使用
Mono.zip
可以有效管理服务器响应的同步。此外,在服务器 2 的 Flux 上使用 cache
可确保您可以在必要时重新订阅缓冲的数据流,而不会产生额外的服务器调用。必须处理来自服务器 1 的空响应或 null 响应的情况,以维护到前端的数据流的完整性。