即使执行程序已关闭且期货已取消,程序也不会停止运行

问题描述 投票:0回答:1

我编写了一个小代码来与执行程序和线程一起练习。它包含以下内容:

  1. 使用无限队列创建大小为3的固定线程池。
  2. 将具有无限循环(while(true))的3个任务提交给池(然后所有线程都被占用)
  3. 提交第四个任务,该任务将在队列中等待。
  4. executor.shutdown()并执行一个println来查看我如何制作活动任务和任务计数。
  5. 将标志设置为false以停止无限while,然后执行println用于查看我如何进行活动任务和任务计数
  6. mayInterruptIfRunning=true取消所有期货,然后做一个println看看我如何使活动任务和任务计数有

这是代码:

public class Main {


private static ThreadPoolExecutor fixexThreadPool;

public static void main(String[] args) throws InterruptedException {
    System.out.println("Creating fixed thread pool of size 3 and infinite queue.");
    fixexThreadPool = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    final Boolean[] flag = new Boolean[1];
    flag[0] = true;
    List<Future> futures = new ArrayList<>();

    System.out.println("Submiting 3 threads");
    for (int i = 0; i < 3; i++) {
        futures.add(fixexThreadPool.submit(() -> {
            int a = 1;
            while (flag[0]) {
                a++;
            }
            System.out.println("Finishing thread execution.");
        }));
    }
    System.out.println("Done submiting 3 threads.");
    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
    Thread.sleep(3000L);
    System.out.println("Submitting a 4th thread.");

    futures.add(fixexThreadPool.submit(() -> {
        int a = 1;
        while (flag[0]) {
            a++;
        }
        System.out.println("Finishing thread execution");
    }));

    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));

    System.out.println("Executor shutdown");
    fixexThreadPool.shutdown();
    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
    Thread.sleep(2000L);
    System.out.println("Setting flag to false.");
    flag[0] = false;
    Thread.sleep(2000L);
    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
    System.out.println("Cancelling all futures.");
    futures.forEach(f -> f.cancel(true));
    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
}

}

这是执行的输出:

  • 创建大小为3的固定线程池和无限队列。
  • 提交3个线程
  • 已提交3个主题。
  • 有效计数:3 |已完成的任务数:0 |任务数:3
  • 提交第四线程。
  • 有效计数:3 |已完成的任务数:0 |任务数:4
  • 执行器关闭
  • 有效计数:3 |已完成的任务数:0 |任务数:4
  • 将标志设置为false。
  • 有效计数:3 |已完成的任务数:0 |任务数:4
  • 取消所有期货。
  • 有效计数:3 |已完成的任务数:0 |任务数:4

有些事情我不明白。

  1. 为什么,在关闭执行程序后,仍然有活动线程?
  2. 为什么,为了中断无限循环将标志更改为false后,无限while不会中断?
  3. 为什么,在取消所有将来后,都有活动线程?
  4. 无论将标志更改为false,关闭执行程序,甚至取消所有期货,我的程序都不会停止运行。为什么会这样?

提前感谢!

java java.util.concurrent java-threads
1个回答
1
投票

可以通过使用volatile关键字解决此问题。 This thread提供了很多答案,详细解释了什么是volatile,并且那里有很多教程/资源/博客可以提供更多详细信息。另一个关于volatile here的超级详细线程。

坦白讲,互联网上有很多人可以比我更好,更准确地解释它,但是总之,volatile是Java修饰符,当您拥有由多个线程共享的资源时,应该使用volatile 。它告诉JVM确保每个线程缓存的值与主内存中的值同步。

显然,JVM跌落到某个地方,线程持有的值与实际值不完全匹配。对您的实现进行小的更改可以解决此问题:

使标志成为类实例变量

private volatile static Boolean[] flag = new Boolean[1];

请参阅here以了解我为什么这样做。

因此,要稍微放大一点:

private static ThreadPoolExecutor fixexThreadPool;
private volatile static Boolean[] flag = new Boolean[1];

public static void main(String[] args) throws InterruptedException {
    System.out.println("Creating fixed thread pool of size 3 and infinite queue.");
    fixexThreadPool = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    flag[0] = true;
    List<Future> futures = new ArrayList<>();

    System.out.println("Submiting 3 threads");
    ...

代码现在愉快地停止了,没有任何问题,希望这会有所帮助:)

((在旁注中,对为什么使用Boolean []而不是仅使用Boolean感到好奇吗?为了保持答案的一致性,我将其保持原样,但显然也可以将标志用作Boolean而不是数组使用)] >

-编辑-

回答您最近的评论-我很不理解我已经写的文章,但我可以提供我的想法。似乎可以在fixexThreadPool.shutdown();的文档中找到调用ThreadPoolExecutor时应用未“退出”的原因。即-

public void shutdown()

启动有序关闭,在该顺序中将执行先前提交的任务,但不会接受任何新任务。如果已关闭,则调用不会产生任何其他影响。

while循环已经提交,因此很高兴继续执行。

我对此进行了探索,以了解发生了什么。

首先,我不喜欢那么长的状态日志行,所以我为此创建了一个单独的方法!我还注意到ThreadPoolExecutor中有一些有趣的布尔状态,而Future决定也将它们记录下来:

private static void Log() {
     System.out.println(String.format("\nActive count: %s | Completed task count: %s | task count: %s", 
             fixexThreadPool.getActiveCount(), 
             fixexThreadPool.getCompletedTaskCount(), 
             fixexThreadPool.getTaskCount()));
     System.out.println(String.format("isShutdown : %s | isTerminated : %s | isTerminating : %s ", 
             fixexThreadPool.isShutdown(), 
             fixexThreadPool.isTerminated(), 
             fixexThreadPool.isTerminating())); 
     System.out.println(String.format("Futures size = %s", futures.size()));
     futures.forEach(f -> System.out.println(String.format("Future isCancelled : %s | isDone : %s", f.isCancelled(), f.isDone())));
     System.out.println("");
}

将其放入您的代码中,我们得到:

    Log();
    System.out.println("Executor shutdown");
    fixexThreadPool.shutdown();
    Log();
    Thread.sleep(2000L);
    System.out.println("Setting flag to false.");
    flag[0] = false;
    Thread.sleep(2000L);
    Log();
    System.out.println("Cancelling all futures.");
    futures.forEach(f -> System.out.println(String.format("Future cancelled - %s", f.cancel(true))));
    Log();

我还想向应用程序添加快速的心跳信号,不时打印一次,这样我就可以看到幕后是否仍在运行:

private static void Heartbeat(int a) {
    int amod = a % 1000000000;
    if(amod == 0) {
        System.out.println(a);
    }
}

使用中:

while (flag[0]) {
    a++;
    Heartbeat(a);
}

然后我进行了一些测试,以不同的组合尝试了一些不同的事情:

  • 注释fixexThreadPool.shutdown();
  • 注释futures.forEach(f -> f.cancel(true));
  • 注释flag[0] = false;
  • 尝试/不尝试新的volatile修复程序。
  • 为了使本来就很长的答案简短一些,请随时探索这些组合,但我发现

  • 没有volatile关键字,线程被卡在terminating中州。我只能假设shutdown()不会停止这些任务已经提交,因此没有线程认为这样做会导致循环flags[0] == false,它们将进行递增a。

    据我所知,这表现出文档中概述的行为。因此,shutdown不会停止您的while循环,它只是停止将任何新的Future提交到线程池(并添加到阻塞队列),然后专心地等待while循环完成(显然,标志上没有volatile修饰符,他们永远不会)。

  • [使用易失性,但注释掉每个任务显然是关闭的终止(控制台日志4“完成线程执行。”),但是线程池保持活动状态,程序本身不会终止。池正在耐心地等待新任务,显然这是永远不会实现的。
  • “ future.cancel”逻辑目前超出我的理解。我运行了一个名为future.cancel的测试,并进行了更深入的记录/调查,但实际上并没有具体答案。

  • 我的运行理论是,将来线程已添加到线程池的阻塞队列中,因此调用cancel并不会影响线程池的执行,因此有效地单独调用future.cancel绝对没有任何作用来修复exexThreadPool。

    希望这可以提供很多思考的食物:)

© www.soinside.com 2019 - 2024. All rights reserved.