我有一个代码块,应该使用
CompletableFuture
和 ExecutorService
在不同线程上并行运行。该代码块发出网络请求(使用 RetroFit),并等待每个网络请求完成,然后再继续发出下一个请求,等等。我可以多次调用 SingleThreadExecutor#submit()
并传递该执行器吗?
我想确保该代码块的所有并行运行都有自己的线程,并在该线程上运行所有操作。
基本上,代码块应该像同步代码一样运行,但是该块的多个实例并行运行,每个实例都在自己的线程上。
目前,当我们从
Service
拨打电话时,程序就会闲置,我怀疑这与传递相同的SingleThreadExecutor
有关,但我有点卡住了。
ArrayList<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < SIZE; i++) {
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
Future<List<String>> stringListFuture = myStringService.getStringList(executor);
// We begin stalling once the next line gets hit.
List<String> stringList = stringListFuture.get();
Future<Integer> integerDataFuture = myIntegerService.getInteger(executor);
Integer integerData = integerDataFuture.get();
executor.shutdown();
}, executor);
futures.add(future);
}
CompletableFuture<Void> completableFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
completableFutures.join();
// Do something after all blocks of code have finished running in parallel.
class MyStringFuture {
public Future<List<String>> getStringList(ExecutorService executor) {
// Could these nested uses of the same executor be the problem?
Future<Double> doubeDataFuture = doubleDataService.getFuture();
Double doubleData = doubeDataFuture.get(executor);
Call<List<Dividend>> data = retrofit.create(Api.class).getData(doubleData);
return executor.submit(() -> data.execute().body());
}
}
您的代码使用单线程执行器任务,它将另一个任务提交给同一个单线程执行器,然后等待该子任务退出。与此示例相同,它将打印“ONE”和“THREE”,并且从不打印“TWO”或“cf”:
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
log("ONE");
// This subtask can never start while current task is still running:
Future<?> cf = executor.submit(() -> log("TWO"));
log("THREE");
try
{
// Blocks forever if run from single thread executor:
log("cf"+cf.get());
}
catch (Exception e)
{
throw new RuntimeException("It failed");
}
log("FOUR");
executor.shutdown();
}, executor);
子任务只能在主任务退出后运行 - 如果打印了“FOUR” - 但卡在等待中
cf.get()
。
解决方案很简单 - 您应该在单独的线程或执行器队列上处理初始任务,以供子任务使用的服务。