我有一组非阻塞操作,它们可以启动一个操作,检查它是否完成并获得结果。问题是start
和getResult
不会立即一起出现,但是我需要返回结果而不会阻塞。下面的示例代码演示了我正在尝试做的事情。
@GetMapping("/some-mapping")
public Mono<String> someops(@RequestParam(value="param") String someParam) {
int jobId = nonBlockingOps.start(someParam); // 1. start the op
return Mono.defer( () -> {
// wait for op to complete
while (!nonBlockingOps.isDone(jobId)) { /// 2. check if the op finished
try {
Thread.sleep(1000); // This will block
} catch (InterruptedException e) {
return Mono.error(e);
}
}
return Mono.just(nonBlockingOps.getResult(jobId)); // 3. return the result
} );
}
因此,即使单声道为defer
版本,订阅发生时也会阻塞。
1。那么,如何用Mono
将其包裹起来,以便单声道处理等待的部分?
或者(或另外),您可以考虑以下三种服务:
@PostMapping("/job")
public Mono<Integer> opsStart(@RequestParam(value="param") String param)
{ return Mono.just(nonBlockingOps.start(param)); }
@GetMapping("/job/isDone/{id}")
public Mono<Boolean> opsCheck(@PathVariable Integer id)
{ return Mono.just(nonBlockingOps.isDone(id)); }
@GetMapping("/job/getResult/{id}")
public Mono<String> opsGet(@PathVariable Integer id)
{ return Mono.just(nonBlockingOps.getResult(id)); }
2a。那么,如何通过调用这些函数来达到相同的结果呢?
2b。那么,如何通过WebClient
调用这些服务来达到相同的结果?
Flux.interval
定期进行破解:@GetMapping("/some-mapping")
public Mono<String> someops(@RequestParam(value="param") String someParam) {
return Mono.just(nonBlockingOps.start(someParam))
.flatMapMany(jobId -> Flux.interval(Duration.ZERO, Duration.ofMillis(1000)).map(__ -> jobId))
.filter(nonBlockingOps::isDone)
.next()
.map(nonBlockingOps::getResult);
}
public class ApiClient {
@Data
@AllArgsConstructor
private static class Status {
private int jobId;
private boolean done;
}
private final WebClient client;
public ApiClient(String baseUrl) {
client = WebClient.create(baseUrl);
}
public Mono<String> getJob(String param) {
return submitJob(param)
.flatMapMany(id -> Flux.interval(Duration.ZERO, Duration.ofMillis(1000)).map(__ -> id))
.flatMap(this::getStatus)
.filter(Status::isDone)
.next()
.flatMap(status -> getResult(status.getJobId(), String.class));
}
private Mono<Integer> submitJob(String param) {
return client.post()
.uri(builder -> builder.path("/job").queryParam("param", param).build())
.retrieve().bodyToMono(Integer.class);
}
private Mono<Status> getStatus(int jobId) {
return client.get()
.uri(builder -> builder.path("/job/isDone/{id}").build(jobId))
.retrieve().bodyToMono(Boolean.class)
.map(status -> new Status(jobId, status));
}
private <T> Mono<T> getResult(int jobId, Class<T> clazz) {
return client.get()
.uri(builder -> builder.path("/job/getResult/{id}").build(jobId))
.retrieve()
.bodyToMono(clazz);
}
}
老实说,我每秒轮询一次工作状态都没有多大意义。我可能至少会返回工作进度并实现某种推送通知(使用WebSocket或服务器发送的事件)。
上面的代码未经测试,但显示了总体思路。可能有些事情需要调整(请求标头等)。