如何使用反应性单声道环绕一系列非阻塞操作?

问题描述 投票:1回答:1

我有一组非阻塞操作,它们可以启动一个操作,检查它是否完成并获得结果。问题是startgetResult不会立即一起出现,但是我需要返回结果而不会阻塞。下面的示例代码演示了我正在尝试做的事情。

@GetMapping("/some-mapping")
public Mono<String> someops(@RequestParam(value="param") String someParam) {
    int jobId = nonBlockingOps.start(someParam); // 1. start the op
    return Mono.defer( () -> {
        // wait for op to complete
        while (!nonBlockingOps.isDone(jobId)) { /// 2. check if the op finished
            try {
                Thread.sleep(1000); // This will block
            } catch (InterruptedException e) {
                return Mono.error(e);
            }
        }
        return Mono.just(nonBlockingOps.getResult(jobId)); // 3. return the result
    } );
}

因此,即使单声道为defer版本,订阅发生时也会阻塞。

1。那么,如何用Mono将其包裹起来,以便单声道处理等待的部分?

或者(或另外),您可以考虑以下三种服务:

@PostMapping("/job")
public Mono<Integer> opsStart(@RequestParam(value="param") String param) 
{ return Mono.just(nonBlockingOps.start(param)); }

@GetMapping("/job/isDone/{id}")
public Mono<Boolean> opsCheck(@PathVariable Integer id) 
{ return Mono.just(nonBlockingOps.isDone(id)); }

@GetMapping("/job/getResult/{id}")
public Mono<String> opsGet(@PathVariable Integer id) 
{ return Mono.just(nonBlockingOps.getResult(id)); }

2a。那么,如何通过调用这些函数来达到相同的结果呢?

2b。那么,如何通过WebClient调用这些服务来达到相同的结果?

spring spring-boot mono reactive-programming reactor
1个回答
0
投票
  1. 如果作业准备就绪,您可以使用Flux.interval定期进行破解:
@GetMapping("/some-mapping")
public Mono<String> someops(@RequestParam(value="param") String someParam) {
    return Mono.just(nonBlockingOps.start(someParam))
            .flatMapMany(jobId -> Flux.interval(Duration.ZERO, Duration.ofMillis(1000)).map(__ -> jobId))
            .filter(nonBlockingOps::isDone)
            .next()
            .map(nonBlockingOps::getResult);
}
  1. 使用WebClient调用三个服务的外观类似:
public class ApiClient {

    @Data
    @AllArgsConstructor
    private static class Status {
        private int jobId;
        private boolean done;
    }

    private final WebClient client;

    public ApiClient(String baseUrl) {
        client = WebClient.create(baseUrl);
    }

    public Mono<String> getJob(String param) {

        return submitJob(param)
                .flatMapMany(id -> Flux.interval(Duration.ZERO, Duration.ofMillis(1000)).map(__ -> id))
                .flatMap(this::getStatus)
                .filter(Status::isDone)
                .next()
                .flatMap(status -> getResult(status.getJobId(), String.class));
    }

    private Mono<Integer> submitJob(String param) {
        return client.post()
                .uri(builder -> builder.path("/job").queryParam("param", param).build())
                .retrieve().bodyToMono(Integer.class);
    }

    private Mono<Status> getStatus(int jobId) {
        return client.get()
                .uri(builder -> builder.path("/job/isDone/{id}").build(jobId))
                .retrieve().bodyToMono(Boolean.class)
                .map(status -> new Status(jobId, status));
    }

    private <T> Mono<T> getResult(int jobId, Class<T> clazz) {
        return client.get()
                .uri(builder -> builder.path("/job/getResult/{id}").build(jobId))
                .retrieve()
                .bodyToMono(clazz);
    }
}

老实说,我每秒轮询一次工作状态都没有多大意义。我可能至少会返回工作进度并实现某种推送通知(使用WebSocket或服务器发送的事件)。

上面的代码未经测试,但显示了总体思路。可能有些事情需要调整(请求标头等)。

© www.soinside.com 2019 - 2024. All rights reserved.