如何运行一系列任务,其中一些任务按顺序运行,而另一个任务与这些顺序任务并行运行。最后,我想结合这些顺序任务和并行任务的结果,然后再次传递给顺序任务列表。
例如:比方说,我有 6 个任务 - t1、t2、t3、t4、t5、t6。
我想按顺序运行 t1->t2->t3,而 t4 可以与这些并行运行。一旦所有这些都完成了,我想将输出提供给 t5->t6。
注意:任务的数量可能会有所不同,因此我正在寻找一个通用的答案,我可以在其中传递必须按顺序运行的任务列表和可以并行运行的任务。
寻找有关如何在 Java 中实现它的建议?
我相信这可能是你想要的:
ExecutorService parallel = Executors.newCachedThreadpool();
ExecutorService sequentialGlobal = Executors.newSingleThreadExecutor();
// first, we add a a parallel executor which runs two sets of tasks in parallel
sequentialGlobal.execute( () -> {
parallel.execute( () -> { //add the first set of tasks to run sequentially in one thread
ExecutorService sequential = Executors.newSingleThreadExecutor();
for(Runnable r: sequentials) sequential.execute(r); //sequentials would contain t1, t2 and t3 in the example
}
parallel.execute(t4); //and the other in another thread
//next we await the termination of those 2 threads
parallel.awaitTermination(timeout, timeUint); //select some value that fits your need
});
// we add the remaining tasks to run sequentially
for (Runnable r: lastSequentials) { //t5, t6
sequentialGlobal.execute(r);
}
// and await termination.
sequentialGlobal.awaitTermination(timeout, timeUnit);
我这样工作,顺序执行器,是单线程执行器,保证任务按添加的顺序顺序执行。并行执行器为每个任务创建一个线程。
总的来说:
ExecutorService
上执行即可,而 Executors.newCachedThreadPool()
可能是一个不错的默认选择。.thenApply(...)
.join()
停止并等待执行结果CompletableFuture.allOf
有一个CompletableFuture
几个在你的情况下它可能是这样的:
ExecutorService executor = Executors.newCachedThreadPool()
Supplier<String> task1 = { "task1" }
Function<String, String> task2 = { it + " -> task2" }
Function<String, String> task3 = { it + " -> task3" }
Supplier<String> task4 = { "task4" }
CompletableFuture<String> task3result = CompletableFuture.supplyAsync(task1, executor)
.thenApply(task2)
.thenApply(task3)
CompletableFuture<String> task4result = CompletableFuture.supplyAsync(task4, executor)
Function<Void, String> merge123and4 = {
"[[" + task3result.join() + "] + " + task4result.join() + "]"
}
Function<String, String> task5 = { it + " -> task5" }
Function<String, String> task6 = { it + " -> task6" }
// or instead of .allOf().thenApplyAsync() you could simply use a new .supplyAsync()
CompletableFuture<String> result = CompletableFuture.allOf(task3result, task4result)
.thenApplyAsync(merge123and4, executor)
.thenApply(task5)
.thenApply(task6)
System.out.println("Result: " + result.join())
印花:
Result: [[task1 -> task2 -> task3] + task4] -> task5 -> task6