无法制作具有大小限制的缓存线程池?

问题描述 投票:114回答:11

似乎不可能创建一个缓存的线程池,它可以创建的线程数限制。

以下是在标准Java库中实现静态Executors.newCachedThreadPool的方法:

 public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

因此,使用该模板继续创建固定大小的缓存线程池:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());

现在,如果你使用它并提交3个任务,一切都会好的。提交任何进一步的任务将导致被拒绝的执行异常。

试试这个:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());

将导致所有线程按顺序执行。即,线程池永远不会有多个线程来处理您的任务。

这是ThreadPoolExecutor的execute方法中的错误?或者这可能是故意的?还是有其他方式?

编辑:我想要一些与缓存线程池完全相同的东西(它根据需要创建线程,然后在一些超时后杀死它们)但是它可以创建的线程数量受到限制,并且一旦有了它就能够继续排队其他任务达到了它的线程限制。根据sjlee的回应,这是不可能的。查看ThreadPoolExecutor的execute()方法确实是不可能的。我需要继承ThreadPoolExecutor并覆盖execute(),就像SwingWorker一样,但SwingWorker在其execute()中所做的是一个完整的hack。

java multithreading concurrency executorservice threadpoolexecutor
11个回答
220
投票

ThreadPoolExecutor具有以下几个关键行为,您的问题可以通过这些行为来解释。

提交任务时,

  1. 如果线程池未达到核心大小,则会创建新线程。
  2. 如果已达到核心大小且没有空闲线程,则会对任务进行排队。
  3. 如果已达到核心大小,则没有空闲线程,并且队列变满,它会创建新线程(直到达到最大大小)。
  4. 如果已达到最大大小,则没有空闲线程,并且队列已满,拒绝策略将启动。

在第一个示例中,请注意SynchronousQueue的大小基本为0.因此,当您达到最大大小(3)时,拒绝策略将启动(#4)。

在第二个示例中,选择的队列是LinkedBlockingQueue,其大小不受限制。因此,你会陷入行为#2。

你不能真正修改缓存类型或固定类型,因为它们的行为几乎完全确定。

如果您想拥有一个有界和动态的线程池,则需要使用正核心大小和最大大小以及有限大小的队列。例如,

new ThreadPoolExecutor(10, // core size
    50, // max size
    10*60, // idle timeout
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(20)); // queue with a size

附录:这是一个相当古老的答案,看来JDK在核心大小为0时改变了它的行为。从JDK 1.6开始,如果核心大小为0且池没有任何线程,则ThreadPoolExecutor将添加一个线程来执行该任务。因此,核心大小为0是上述规则的例外。感谢Stevebringing引起我的注意。


1
投票

这是另一种解决方案。我认为这个解决方案的行为符合您的要求(虽然不为此解决方案感到自豪):

Executors.newFixedThreadPool(int n)

0
投票
  1. 您可以使用@sjlee建议的final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() { public boolean offer(Runnable o) { if (size() > 1) return false; return super.offer(o); }; public boolean add(Runnable o) { if (super.offer(o)) return true; else throw new IllegalStateException("Queue full"); } }; RejectedExecutionHandler handler = new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { queue.add(r); } }; dbThreadExecutor = new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler); 您可以动态控制池的大小。有关详细信息,请查看此问题: ThreadPoolExecutor 要么
  2. 您可以使用随java 8引入的Dynamic Thread Pool API。 newWorkStealingPool 使用所有可用处理器作为其目标并行度级别创建工作窃取线程池。

默认情况下,并行度级别设置为服务器中的CPU核心数。如果您有4个核心CPU服务器,则线程池大小为4.此API返回public static ExecutorService newWorkStealingPool() 类型的ForkJoinPool,并允许通过从ForkJoinPool中的繁忙线程中窃取任务来窃取空闲线程。


58
投票

除非我错过了什么,否则原始问题的解决方案很简单。以下代码实现了原始海报所描述的所需行为。它将产生最多5个线程来处理无界队列,空闲线程将在60秒后终止。

tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);

7
投票

有同样的问题。由于没有其他答案将所有问题放在一起,我加入我的:

它现在清楚地写在docs中:如果你使用一个不阻塞的队列(LinkedBlockingQueue),最大线程设置没有效果,只使用核心线程。

所以:

public class MyExecutor extends ThreadPoolExecutor {

    public MyExecutor() {
        super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        allowCoreThreadTimeOut(true);
    }

    public void setThreads(int n){
        setMaximumPoolSize(Math.max(1, n));
        setCorePoolSize(Math.max(1, n));
    }

}

该执行人具有:

  1. 没有最大线程的概念,因为我们正在使用无界队列。这是一件好事,因为这样的队列可能会导致执行程序创建大量非核心额外线程(如果它遵循其通常的策略)。
  2. 最大尺寸Integer.MAX_VALUE的队列。如果未决任务的数量超过Submit()RejectedExecutionException将抛出Integer.MAX_VALUE。不确定我们会先耗尽内存还是会发生这种情况。
  3. 有4个核心线程可能。闲置核心线程在空闲5秒后自动退出。所以,是的,严格按需求threads.Number可以使用setThreads()方法改变。
  4. 确保核心线程的最小数量永远不会少于1,否则submit()将拒绝每个任务。由于核心线程需要> =最大线程,因此方法setThreads()也设置了最大线程,尽管最大线程设置对于无界队列是无用的。

6
投票

在您的第一个示例中,后续任务被拒绝,因为AbortPolicy是默认的RejectedExecutionHandler。 ThreadPoolExecutor包含以下策略,您可以通过setRejectedExecutionHandler方法更改这些策略:

CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy

听起来你想要一个CallerRunsPolicy的缓存线程池。


5
投票

这里的答案都没有解决我的问题,这与使用Apache的HTTP客户端(3.x版本)创建有限数量的HTTP连接有关。由于我花了几个小时来找出一个好的设置,我将分享:

private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
  TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
  Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

这将创建一个以5开头的ThreadPoolExecutor,并使用CallerRunsPolicy执行最多10个同时运行的线程。


3
投票

根据ThreadPoolExecutor的Javadoc:

如果有多个corePoolSize但运行的maximumPoolSize线程少于maximumPoolSize,则只有在队列已满时才会创建新线程。通过设置corePoolSize和maximumPoolSize,可以创建固定大小的线程池。

(强调我的。)

抖动的答案就是你想要的,虽然我的回答你的另一个问题。 :)


2
投票

还有一个选择。您也可以使用任何其他队列,而不是使用新的SynchronousQueue,但您必须确保其大小为1,这样才会强制执行器服务创建新线程。


2
投票

看起来好像没有任何答案实际回答这个问题 - 实际上我看不到这样做的方法 - 即使你从PooledExecutorService继承子类,因为许多方法/属性是私有的,例如使addIfUnderMaximumPoolSize受到保护您可以执行以下操作:

class MyThreadPoolService extends ThreadPoolService {
    public void execute(Runnable run) {
        if (poolSize() == 0) {
            if (addIfUnderMaximumPoolSize(run) != null)
                return;
        }
        super.execute(run);
    }
}

我得到的最接近的是 - 但即使这不是一个很好的解决方案

new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
    public void execute(Runnable command) {
        if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {        
            super.setCorePoolSize(super.getCorePoolSize() + 1);
        }
        super.execute(command);
    }

    protected void afterExecute(Runnable r, Throwable t) {
         // nothing in the queue
         if (getQueue().isEmpty() && getPoolSize() > min) {
             setCorePoolSize(getCorePoolSize() - 1);
         }
    };
 };

附:未经上述测试


2
投票

这就是你想要的(至少我猜是这样)。有关解释,请查看qazxsw poi

Jonathan Feinberg answer

创建一个线程池,该线程池重用在共享的无界队列中运行的固定数量的线程。在任何时候,最多nThreads线程将是活动的处理任务。如果在所有线程都处于活动状态时提交了其他任务,则它们将在队列中等待,直到线程可用。如果任何线程由于在关闭之前执行期间的故障而终止,则在需要执行后续任务时将使用新的线程。池中的线程将一直存在,直到它被明确关闭。

© www.soinside.com 2019 - 2024. All rights reserved.