如何使用ExecutorService进行轮询直到结果到达

问题描述 投票:5回答:2

我有一个场景,我必须轮询远程服务器检查任务是否已完成。一旦有,我会进行不同的调用以检索结果。

我原本认为我应该使用SingleThreadScheduledExecutorscheduleWithFixedDelay进行投票:

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId), 0, 10, TimeUnit.SECONDS);

public void poll(String jobId) {
   boolean jobDone = remoteServer.isJobDone(jobId);
   if (jobDone) {
       retrieveJobResult(jobId);
   }
}

但是因为我只能向Runnable提供scheduleWithFixedDelay,它不能返回任何东西,我不明白future什么时候完成,如果有的话。叫future.get()甚至意味着什么?我在等什么结果?

我第一次检测到远程任务已完成时,我想执行一个不同的远程调用并将其结果设置为future的值。我想我可以使用CompletableFuture,我会转发到我的poll方法,然后将它转发到我的retrieveTask方法,最终完成它:

CompletableFuture<Object> result = new CompletableFuture<Object>();
ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId, result), 0, 10, TimeUnit.SECONDS);

public void poll(String jobId, CompletableFuture<Object> result) {
   boolean jobDone = remoteServer.isJobDone(jobId);
   if (jobDone) {
       retrieveJobResult(jobId, result);
   }
}

public void retrieveJobResult(String jobId, CompletableFuture<Object> result) {
    Object remoteResult = remoteServer.getJobResult(jobId);
    result.complete(remoteResult);
}

但这有很多问题。例如,CompletableFuture似乎甚至不打算用于这种用途。相反,我应该做CompletableFuture.supplyAsync(() -> poll(jobId))我想,但是我怎么能正确关闭executor并取消当我的future取消/完成时返回的CompletableFuture?感觉应该以一种完全不同的方式实施民意调查。

java concurrency future executorservice completable-future
2个回答
3
投票

我认为CompletableFutures是一个很好的方法:

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

private void run() {
    final Object jobResult = pollForCompletion("jobId1")
            .thenApply(jobId -> remoteServer.getJobResult(jobId))
            .get();

}

private CompletableFuture<String> pollForCompletion(String jobId) {
    CompletableFuture<String> completionFuture = new CompletableFuture<>();
    final ScheduledFuture<Void> checkFuture = executor.scheduleAtFixedRate(() -> {
        if (remoteServer.isJobDone(jobId)) {
            completionFuture.complete(jobId);
        }
    }, 0, 10, TimeUnit.SECONDS);
    completionFuture.whenComplete((result, thrown) -> {
        checkFuture.cancel(true);
    });
    return completionFuture;
}

2
投票

在我看来,你比其他人更担心一些风格问题。在java 8中,CompletableFuture有两个角色:一个是传统的未来,它为任务执行和状态查询提供异步源;另一个是我们通常所说的承诺。承诺,如果您还不知道,可以被视为未来的建设者及其完成源。所以在这种情况下,直觉上需要一个承诺,这就是你在这里使用的确切情况。你担心的例子是介绍第一种用法的东西,而不是诺言方式。

接受这一点,你应该更容易开始处理你的实际问题。我认为承诺应该有两个角色,一个是通知你的任务完成轮询,另一个是在完成时取消你的预定任务。这应该是最终解决方案:

public CompletableFuture<Object> pollTask(int jobId) {
    CompletableFuture<Object> fut = new CompletableFuture<>();
    ScheduledFuture<?> sfuture = executor.scheduleWithFixedDelay(() -> _poll(jobId, fut), 0, 10, TimeUnit.SECONDS);
    fut.thenAccept(ignore -> sfuture.cancel(false));
    return fut;
}

private void _poll(int jobId, CompletableFuture<Object> fut) {
    // whatever polls
    if (isDone) {
        fut.complete(yourResult);
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.