我需要一次执行4个任务,如下所示:
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
//...wait for completion somehow
一旦完成所有内容,我该如何收到通知?现在我想不出比设置一些全局任务计数器更好的事情,并在每个任务结束时减少它,然后在无限循环中监视这个计数器变为0;或获得一个Futures列表,并在无限循环监视器isDone中为所有这些。什么是更好的解决方案不涉及无限循环?
谢谢。
基本上在ExecutorService
你叫shutdown()
然后awaitTermination()
:
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
...
}
你可以将你的任务包装在另一个runnable中,它将发送通知:
taskExecutor.execute(new Runnable() {
public void run() {
taskStartedNotification();
new MyTask().run();
taskFinishedNotification();
}
});
这里有两个选项,只是有点混淆哪一个最好去。
选项1:
ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, es))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
选项2:
ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
futures.add(es.submit(task));
}
for(Future<?> future : futures) {
try {
future.get();
}catch(Exception e){
// do logging and nothing else
}
}
es.shutdown();
这里放下future.get();在尝试捕获是好主意吗?
我刚刚编写了一个示例程序来解决您的问题。没有给出简洁的实现,所以我将添加一个。虽然你可以使用executor.shutdown()
和executor.awaitTermination()
,但这不是最好的做法,因为不同线程所花费的时间是不可预测的。
ExecutorService es = Executors.newCachedThreadPool();
List<Callable<Integer>> tasks = new ArrayList<>();
for (int j = 1; j <= 10; j++) {
tasks.add(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
System.out.println("Starting Thread "
+ Thread.currentThread().getId());
for (int i = 0; i < 1000000; i++) {
sum += i;
}
System.out.println("Stopping Thread "
+ Thread.currentThread().getId());
return sum;
}
});
}
try {
List<Future<Integer>> futures = es.invokeAll(tasks);
int flag = 0;
for (Future<Integer> f : futures) {
Integer res = f.get();
System.out.println("Sum: " + res);
if (!f.isDone())
flag = 1;
}
if (flag == 0)
System.out.println("SUCCESS");
else
System.out.println("FAILED");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
只是为了提供更多替代方案,使用闩锁/障碍物。您还可以获得部分结果,直到所有结果都使用CompletionService完成。
在实践中来自Java Concurrency:“如果你有一批计算要提交给Executor,并且你想在结果可用时检索它们,你可以保留与每个任务相关的Future,并通过调用get来重复轮询完成零超时。这是可能的,但很乏味。幸运的是有更好的方法:完成服务。“
在这里实施
public class TaskSubmiter {
private final ExecutorService executor;
TaskSubmiter(ExecutorService executor) { this.executor = executor; }
void doSomethingLarge(AnySourceClass source) {
final List<InterestedResult> info = doPartialAsyncProcess(source);
CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
for (final InterestedResult interestedResultItem : info)
completionService.submit(new Callable<PartialResult>() {
public PartialResult call() {
return InterestedResult.doAnOperationToGetPartialResult();
}
});
try {
for (int t = 0, n = info.size(); t < n; t++) {
Future<PartialResult> f = completionService.take();
PartialResult PartialResult = f.get();
processThisSegment(PartialResult);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (ExecutionException e) {
throw somethinghrowable(e.getCause());
}
}
}
你可以使用这段代码:
public class MyTask implements Runnable {
private CountDownLatch countDownLatch;
public MyTask(CountDownLatch countDownLatch {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
//Do somethings
//
this.countDownLatch.countDown();//important
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");
这是我的解决方案,基于“AdamSkywalker”提示,它的工作原理
package frss.main;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestHilos {
void procesar() {
ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
System.out.println("FIN DEL PROCESO DE HILOS");
}
private List<Runnable> getTasks() {
List<Runnable> tasks = new ArrayList<Runnable>();
Hilo01 task1 = new Hilo01();
tasks.add(task1);
Hilo02 task2 = new Hilo02();
tasks.add(task2);
return tasks;
}
private class Hilo01 extends Thread {
@Override
public void run() {
System.out.println("HILO 1");
}
}
private class Hilo02 extends Thread {
@Override
public void run() {
try {
sleep(2000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("HILO 2");
}
}
public static void main(String[] args) {
TestHilos test = new TestHilos();
test.procesar();
}
}
我创建了以下工作示例。我们的想法是有一种方法来处理一个任务池(我使用队列作为示例)和许多线程(通过numberOfTasks / threshold以编程方式确定),并等到所有线程完成后继续进行其他一些处理。
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** Testing CountDownLatch and ExecutorService to manage scenario where
* multiple Threads work together to complete tasks from a single
* resource provider, so the processing can be faster. */
public class ThreadCountDown {
private CountDownLatch threadsCountdown = null;
private static Queue<Integer> tasks = new PriorityQueue<>();
public static void main(String[] args) {
// Create a queue with "Tasks"
int numberOfTasks = 2000;
while(numberOfTasks-- > 0) {
tasks.add(numberOfTasks);
}
// Initiate Processing of Tasks
ThreadCountDown main = new ThreadCountDown();
main.process(tasks);
}
/* Receiving the Tasks to process, and creating multiple Threads
* to process in parallel. */
private void process(Queue<Integer> tasks) {
int numberOfThreads = getNumberOfThreadsRequired(tasks.size());
threadsCountdown = new CountDownLatch(numberOfThreads);
ExecutorService threadExecutor = Executors.newFixedThreadPool(numberOfThreads);
//Initialize each Thread
while(numberOfThreads-- > 0) {
System.out.println("Initializing Thread: "+numberOfThreads);
threadExecutor.execute(new MyThread("Thread "+numberOfThreads));
}
try {
//Shutdown the Executor, so it cannot receive more Threads.
threadExecutor.shutdown();
threadsCountdown.await();
System.out.println("ALL THREADS COMPLETED!");
//continue With Some Other Process Here
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
/* Determine the number of Threads to create */
private int getNumberOfThreadsRequired(int size) {
int threshold = 100;
int threads = size / threshold;
if( size > (threads*threshold) ){
threads++;
}
return threads;
}
/* Task Provider. All Threads will get their task from here */
private synchronized static Integer getTask(){
return tasks.poll();
}
/* The Threads will get Tasks and process them, while still available.
* When no more tasks available, the thread will complete and reduce the threadsCountdown */
private class MyThread implements Runnable {
private String threadName;
protected MyThread(String threadName) {
super();
this.threadName = threadName;
}
@Override
public void run() {
Integer task;
try{
//Check in the Task pool if anything pending to process
while( (task = getTask()) != null ){
processTask(task);
}
}catch (Exception ex){
ex.printStackTrace();
}finally {
/*Reduce count when no more tasks to process. Eventually all
Threads will end-up here, reducing the count to 0, allowing
the flow to continue after threadsCountdown.await(); */
threadsCountdown.countDown();
}
}
private void processTask(Integer task){
try{
System.out.println(this.threadName+" is Working on Task: "+ task);
}catch (Exception ex){
ex.printStackTrace();
}
}
}
}
希望能帮助到你!
您可以使用自己的ExecutorCompletionService子类来包装taskExecutor
,以及您自己的BlockingQueue实现,以便在每个任务完成时获取信息,并在完成任务的数量达到预期目标时执行您想要的任何回调或其他操作。
你应该使用executorService.shutdown()
和executorService.awaitTermination
方法。
一个例子如下:
public class ScheduledThreadPoolExample {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
0, 1, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(10);
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
}
}
所以我在这里发布链接问题的答案,因为有人想要一个更简单的方法来做到这一点
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture[] futures = new CompletableFuture[10];
int i = 0;
while (...) {
futures[i++] = CompletableFuture.runAsync(runner, executor);
}
CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.
CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
try {
latch.await();
} catch (InterruptedException E) {
// handle
}
并在你的任务中(包含在try / finally中)
latch.countDown();
Java 8 - 我们可以使用流API来处理流。请参阅下面的代码段
final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool
//alternatively to specify parallelism
new ForkJoinPool(15).submit(
() -> tasks.stream().parallel().forEach(Runnable::run)
).get();
ExecutorService WORKER_THREAD_POOL
= Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
WORKER_THREAD_POOL.submit(() -> {
try {
// doSomething();
latch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// wait for the latch to be decremented by the two remaining threads
latch.await();
如果doSomething()
抛出一些其他例外,latch.countDown()
似乎不会执行,那么我该怎么办?
如果您使用更多线程ExecutionServices SEQUENTIALLY并希望等待每个EXECUTIONSERVICE完成。最好的方法如下;
ExecutorService executer1 = Executors.newFixedThreadPool(THREAD_SIZE1);
for (<loop>) {
executer1.execute(new Runnable() {
@Override
public void run() {
...
}
});
}
executer1.shutdown();
try{
executer1.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
ExecutorService executer2 = Executors.newFixedThreadPool(THREAD_SIZE2);
for (true) {
executer2.execute(new Runnable() {
@Override
public void run() {
...
}
});
}
executer2.shutdown();
} catch (Exception e){
...
}
这可能有所帮助
Log.i(LOG_TAG, "shutting down executor...");
executor.shutdown();
while (true) {
try {
Log.i(LOG_TAG, "Waiting for executor to terminate...");
if (executor.isTerminated())
break;
if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
break;
}
} catch (InterruptedException ignored) {}
}
你可以在这个Runner类上调用waitTillDone():
Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool
while(...) {
runner.run(new MyTask()); // here you submit your task
}
runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)
runner.shutdown(); // once you done you can shutdown the runner
您可以重用此类并在调用shutdown()之前多次调用waitTillDone(),此外您的代码非常简单。此外,您不必预先知道任务的数量。
要使用它,只需将此gradle / maven compile 'com.github.matejtymes:javafixes:1.3.1'
依赖项添加到您的项目中。
更多详情可在这找到:
执行程序getActiveCount()
中有一个方法 - 它给出了活动线程的数量。
在跨越线程后,我们可以检查activeCount()
值是否为0
。一旦值为零,就意味着当前没有活动线程正在运行,这意味着任务已完成:
while (true) {
if (executor.getActiveCount() == 0) {
//ur own piece of code
break;
}
}
ExecutorService.invokeAll()
为你做。
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);
您还可以使用期货清单:
List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
public Void call() throws IOException {
// do something
return null;
}
}));
然后,当你想要加入所有这些时,它基本上相当于加入每个,(带来额外的好处,它将子线程的异常重新引发到main):
for(Future f: this.futures) { f.get(); }
基本上诀窍是一次调用每个Future上的.get(),而不是无限循环调用isDone()on(all或each)。因此,一旦最后一个线程完成,您就可以保证“继续”前进并经过此块。需要注意的是,由于.get()调用会重新引发异常,如果其中一个线程死掉,你可能会在其他线程完成之前从这里加注[为了避免这种情况,你可以在get中添加一个catch ExecutionException
呼叫]。另一个警告是它保留了对所有线程的引用,所以如果它们有线程局部变量,它们将不会被收集,直到你通过这个块之后(尽管你可能能够绕过它,如果它成为一个问题,通过删除未来离开ArrayList)。如果你想知道哪个“最先完成”你可以使用像https://stackoverflow.com/a/31885029/32453这样的东西
在Java8中,您可以使用CompletableFuture执行此操作:
ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, es))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
只是我的两分钱。为了克服CountDownLatch
事先知道任务数量的要求,你可以通过使用简单的Semaphore
以旧式的方式做到这一点。
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
taskExecutor.execute(new MyTask());
numberOfTasks++;
}
try {
s.aquire(numberOfTasks);
...
在你的任务中,只需像s.release()
一样调用latch.countDown();
游戏有点晚了但是为了完成...
而不是“等待”所有任务完成,你可以考虑好莱坞原则,“不要打电话给我,我会打电话给你” - 当我完成时。我认为结果代码更优雅......
Guava提供了一些有趣的工具来实现这一目标。
一个例子 ::
将ExecutorService包装到ListeningExecutorService ::
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
提交一系列可执行的callables ::
for (Callable<Integer> callable : callables) {
ListenableFuture<Integer> lf = service.submit(callable);
// listenableFutures is a collection
listenableFutures.add(lf)
});
现在必不可少的部分:
ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);
附加一个回调给ListenableFuture,你可以用来在所有期货完成时收到通知::
Futures.addCallback(lf, new FutureCallback<List<Integer>>() {
@Override
public void onSuccess(List<Integer> result) {
log.info("@@ finished processing {} elements", Iterables.size(result));
// do something with all the results
}
@Override
public void onFailure(Throwable t) {
log.info("@@ failed because of :: {}", t);
}
});
这也提供了一个优点,您可以在处理完成后在一个地方收集所有结果...
更多信息here
Java 5及更高版本中的CyclicBarrier类是为这类事物而设计的。
请遵循以下方法之一。
submit
ExecutorService
返回并检查状态,并根据get()
的建议在Future
对象上调用Kiran
invokeAll()
上使用ExecutorServiceshutdown, awaitTermination, shutdownNow
API相关的SE问题: