我编写了一个小代码来与执行程序和线程一起练习。它包含以下内容:
while(true)
)的3个任务提交给池(然后所有线程都被占用)while
,然后执行println用于查看我如何进行活动任务和任务计数mayInterruptIfRunning=true
取消所有期货,然后做一个println看看我如何使活动任务和任务计数有这是代码:
public class Main {
private static ThreadPoolExecutor fixexThreadPool;
public static void main(String[] args) throws InterruptedException {
System.out.println("Creating fixed thread pool of size 3 and infinite queue.");
fixexThreadPool = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
final Boolean[] flag = new Boolean[1];
flag[0] = true;
List<Future> futures = new ArrayList<>();
System.out.println("Submiting 3 threads");
for (int i = 0; i < 3; i++) {
futures.add(fixexThreadPool.submit(() -> {
int a = 1;
while (flag[0]) {
a++;
}
System.out.println("Finishing thread execution.");
}));
}
System.out.println("Done submiting 3 threads.");
System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
Thread.sleep(3000L);
System.out.println("Submitting a 4th thread.");
futures.add(fixexThreadPool.submit(() -> {
int a = 1;
while (flag[0]) {
a++;
}
System.out.println("Finishing thread execution");
}));
System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
System.out.println("Executor shutdown");
fixexThreadPool.shutdown();
System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
Thread.sleep(2000L);
System.out.println("Setting flag to false.");
flag[0] = false;
Thread.sleep(2000L);
System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
System.out.println("Cancelling all futures.");
futures.forEach(f -> f.cancel(true));
System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
}
}
这是执行的输出:
有些事情我不明白。
while
不会中断? 提前感谢!
可以通过使用volatile
关键字解决此问题。 This thread提供了很多答案,详细解释了什么是volatile,并且那里有很多教程/资源/博客可以提供更多详细信息。另一个关于volatile here的超级详细线程。
坦白讲,互联网上有很多人可以比我更好,更准确地解释它,但是总之,volatile是Java修饰符,当您拥有由多个线程共享的资源时,应该使用volatile 。它告诉JVM确保每个线程缓存的值与主内存中的值同步。
显然,JVM跌落到某个地方,线程持有的值与实际值不完全匹配。对您的实现进行小的更改可以解决此问题:
使标志成为类实例变量
private volatile static Boolean[] flag = new Boolean[1];
请参阅here以了解我为什么这样做。
因此,要稍微放大一点:
private static ThreadPoolExecutor fixexThreadPool;
private volatile static Boolean[] flag = new Boolean[1];
public static void main(String[] args) throws InterruptedException {
System.out.println("Creating fixed thread pool of size 3 and infinite queue.");
fixexThreadPool = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
flag[0] = true;
List<Future> futures = new ArrayList<>();
System.out.println("Submiting 3 threads");
...
代码现在愉快地停止了,没有任何问题,希望这会有所帮助:)
((在旁注中,对为什么使用Boolean []而不是仅使用Boolean感到好奇吗?为了保持答案的一致性,我将其保持原样,但显然也可以将标志用作Boolean而不是数组使用)] >
-编辑-
回答您最近的评论-我很不理解我已经写的文章,但我可以提供我的想法。似乎可以在fixexThreadPool.shutdown();
的文档中找到调用ThreadPoolExecutor时应用未“退出”的原因。即-
public void shutdown()
启动有序关闭,在该顺序中将执行先前提交的任务,但不会接受任何新任务。如果已关闭,则调用不会产生任何其他影响。
while循环已经提交,因此很高兴继续执行。
我对此进行了探索,以了解发生了什么。
首先,我不喜欢那么长的状态日志行,所以我为此创建了一个单独的方法!我还注意到ThreadPoolExecutor中有一些有趣的布尔状态,而Future决定也将它们记录下来:
private static void Log() { System.out.println(String.format("\nActive count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount())); System.out.println(String.format("isShutdown : %s | isTerminated : %s | isTerminating : %s ", fixexThreadPool.isShutdown(), fixexThreadPool.isTerminated(), fixexThreadPool.isTerminating())); System.out.println(String.format("Futures size = %s", futures.size())); futures.forEach(f -> System.out.println(String.format("Future isCancelled : %s | isDone : %s", f.isCancelled(), f.isDone()))); System.out.println(""); }
将其放入您的代码中,我们得到:
Log(); System.out.println("Executor shutdown"); fixexThreadPool.shutdown(); Log(); Thread.sleep(2000L); System.out.println("Setting flag to false."); flag[0] = false; Thread.sleep(2000L); Log(); System.out.println("Cancelling all futures."); futures.forEach(f -> System.out.println(String.format("Future cancelled - %s", f.cancel(true)))); Log();
我还想向应用程序添加快速的心跳信号,不时打印一次,这样我就可以看到幕后是否仍在运行:
private static void Heartbeat(int a) { int amod = a % 1000000000; if(amod == 0) { System.out.println(a); } }
使用中:
while (flag[0]) { a++; Heartbeat(a); }
然后我进行了一些测试,以不同的组合尝试了一些不同的事情:
fixexThreadPool.shutdown();
futures.forEach(f -> f.cancel(true));
flag[0] = false;
volatile
修复程序。为了使本来就很长的答案简短一些,请随时探索这些组合,但我发现
没有volatile关键字,线程被卡在terminating
中州。我只能假设shutdown()
不会停止这些任务已经提交,因此没有线程认为这样做会导致循环flags[0] == false
,它们将进行递增a。
据我所知,这表现出文档中概述的行为。因此,shutdown不会停止您的while循环,它只是停止将任何新的Future提交到线程池(并添加到阻塞队列),然后专心地等待while循环完成(显然,标志上没有volatile修饰符,他们永远不会)。
“ future.cancel”逻辑目前超出我的理解。我运行了一个名为future.cancel的测试,并进行了更深入的记录/调查,但实际上并没有具体答案。
我的运行理论是,将来线程已添加到线程池的阻塞队列中,因此调用cancel并不会影响线程池的执行,因此有效地单独调用future.cancel绝对没有任何作用来修复exexThreadPool。
希望这可以提供很多思考的食物:)