如何通过使用 Flux 在 Spring Boot 中进行并行服务器调用来优化令牌生成?

问题描述 投票:0回答:1

我正在努力优化生成令牌流的流程,但不确定如何开始实施更有效的方法。

下面,我描述了当前的方法和我的改进想法,我正在寻找一个简单的示例来说明如何使用 Spring Boot 和 Flux 来实现建议的优化。

目前的做法:

  1. 收到用户查询。

  2. 调用服务器1:根据查询获取整数列表。如果列表有效且非空,请继续下一步。

  3. 调用服务器 2:将用户查询和列表从服务器 1 传递到服务器 2。响应是令牌流。

  4. 将数据发送到前端:从服务器 2 接收到的令牌流,最后,整数列表被发送到前端。

  5. 如果第一次调用结果为 null 或空列表,则不会进行第二次调用。

建议的优化:

我想并行调用Server 1和Server 2以提高效率:

  1. 并行呼叫:同时向服务器1和服务器2发起呼叫。

  2. 缓冲:将来自服务器 2 的令牌流临时存储在缓冲区中。

  3. 处理服务器 1 响应:

    a) 如果返回索引:将缓冲的令牌发送到前端,然后继续将任何新令牌直接流式传输到前端。

    b) 如果为 null 或为空:终止服务器 2 调用并通知前端没有可用信息。

第一种方法是在 Spring Boot 中使用 Flux 实现的。但是,我不确定如何实现并行方法的缓冲和条件逻辑。

如何使用 Flux 在 Spring Boot 中高效地实现这种并行处理模型?是否有特定的模式或 Flux 方法可以帮助根据第一次调用的结果管理这些并发调用和条件响应?

java spring spring-boot spring-webflux spring-webclient
1个回答
0
投票

如何使用 Flux 在 Spring Boot 中通过并行服务器调用来优化令牌生成

1. 设置并行调用

根据预期的返回类型(单个响应与流),使用

Mono.zip
Flux.zip
同时发起对服务器 1 和服务器 2 的调用。此方法同步两个操作,允许您一起处理它们的结果,这对于优化响应时间至关重要。

2. 缓冲服务器 2 响应

要管理这些操作的异步性质,请考虑缓冲来自服务器 2 的响应。您可以在 Flux 中使用

buffer
cache
运算符,它会在您等待服务器 1 的响应时临时存储发出的令牌。这是对于管理数据流而不丢失任何临时令牌特别有用。

3. 基于服务器 1 响应的条件逻辑

服务器 1 的响应决定后续步骤:

  • 如果服务器 1 返回有效列表:继续将缓冲的令牌发送到前端,然后在发出任何后续令牌时继续流式传输它们。
  • 如果服务器 1 返回 null 或空列表:使用 Flux 上的
    takeUntilOther
    方法来停止或忽略来自服务器 2 的令牌流。此方法允许您根据触发器停止流,在本例中为来自服务器 1 的结果。

实施示例:

下面是如何在 Spring Boot 中实现此功能的简化示例:

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public void generateTokens() {
    Mono<List<Integer>> server1Response = queryServer1();
    Flux<String> server2Tokens = queryServer2();

    Flux<String> bufferedServer2Tokens = server2Tokens.cache();

    Mono.zip(server1Response, bufferedServer2Tokens.collectList())
        .flatMap(tuple -> {
            List<Integer> server1Result = tuple.getT1();
            List<String> tokens = tuple.getT2();

            if (server1Result == null || server1Result.isEmpty()) {
                return Mono.empty();  // No further action if Server 1's response is empty
            } else {
                return Flux.concat(Flux.fromIterable(tokens), bufferedServer2Tokens);
            }
        })
        .subscribe(this::sendToFrontend, this::handleError);
}

private Mono<List<Integer>> queryServer1() {
    // Placeholder for server call implementation
}

private Flux<String> queryServer2() {
    // Placeholder for server call implementation
}

private void sendToFrontend(String token) {
    // Logic to send data to the frontend
}

private void handleError(Throwable error) {
    // Error handling logic
}

备注:

使用

Mono.zip
可以有效管理服务器响应的同步。此外,在服务器 2 的 Flux 上使用
cache
可确保您可以在必要时重新订阅缓冲的数据流,而不会产生额外的服务器调用。必须处理来自服务器 1 的空响应或 null 响应的情况,以维护到前端的数据流的完整性。

© www.soinside.com 2019 - 2024. All rights reserved.