似乎不可能创建一个缓存的线程池,它可以创建的线程数限制。
以下是在标准Java库中实现静态Executors.newCachedThreadPool的方法:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
因此,使用该模板继续创建固定大小的缓存线程池:
new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());
现在,如果你使用它并提交3个任务,一切都会好的。提交任何进一步的任务将导致被拒绝的执行异常。
试试这个:
new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());
将导致所有线程按顺序执行。即,线程池永远不会有多个线程来处理您的任务。
这是ThreadPoolExecutor的execute方法中的错误?或者这可能是故意的?还是有其他方式?
编辑:我想要一些与缓存线程池完全相同的东西(它根据需要创建线程,然后在一些超时后杀死它们)但是它可以创建的线程数量受到限制,并且一旦有了它就能够继续排队其他任务达到了它的线程限制。根据sjlee的回应,这是不可能的。查看ThreadPoolExecutor的execute()方法确实是不可能的。我需要继承ThreadPoolExecutor并覆盖execute(),就像SwingWorker一样,但SwingWorker在其execute()中所做的是一个完整的hack。
ThreadPoolExecutor具有以下几个关键行为,您的问题可以通过这些行为来解释。
提交任务时,
在第一个示例中,请注意SynchronousQueue的大小基本为0.因此,当您达到最大大小(3)时,拒绝策略将启动(#4)。
在第二个示例中,选择的队列是LinkedBlockingQueue,其大小不受限制。因此,你会陷入行为#2。
你不能真正修改缓存类型或固定类型,因为它们的行为几乎完全确定。
如果您想拥有一个有界和动态的线程池,则需要使用正核心大小和最大大小以及有限大小的队列。例如,
new ThreadPoolExecutor(10, // core size
50, // max size
10*60, // idle timeout
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(20)); // queue with a size
附录:这是一个相当古老的答案,看来JDK在核心大小为0时改变了它的行为。从JDK 1.6开始,如果核心大小为0且池没有任何线程,则ThreadPoolExecutor将添加一个线程来执行该任务。因此,核心大小为0是上述规则的例外。感谢Steve为bringing引起我的注意。
这是另一种解决方案。我认为这个解决方案的行为符合您的要求(虽然不为此解决方案感到自豪):
Executors.newFixedThreadPool(int n)
final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
public boolean offer(Runnable o) {
if (size() > 1)
return false;
return super.offer(o);
};
public boolean add(Runnable o) {
if (super.offer(o))
return true;
else
throw new IllegalStateException("Queue full");
}
};
RejectedExecutionHandler handler = new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
queue.add(r);
}
};
dbThreadExecutor =
new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);
您可以动态控制池的大小。有关详细信息,请查看此问题:
ThreadPoolExecutor
要么默认情况下,并行度级别设置为服务器中的CPU核心数。如果您有4个核心CPU服务器,则线程池大小为4.此API返回public static ExecutorService newWorkStealingPool()
类型的ForkJoinPool
,并允许通过从ForkJoinPool中的繁忙线程中窃取任务来窃取空闲线程。
除非我错过了什么,否则原始问题的解决方案很简单。以下代码实现了原始海报所描述的所需行为。它将产生最多5个线程来处理无界队列,空闲线程将在60秒后终止。
tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);
有同样的问题。由于没有其他答案将所有问题放在一起,我加入我的:
它现在清楚地写在docs中:如果你使用一个不阻塞的队列(LinkedBlockingQueue
),最大线程设置没有效果,只使用核心线程。
所以:
public class MyExecutor extends ThreadPoolExecutor {
public MyExecutor() {
super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
allowCoreThreadTimeOut(true);
}
public void setThreads(int n){
setMaximumPoolSize(Math.max(1, n));
setCorePoolSize(Math.max(1, n));
}
}
该执行人具有:
Integer.MAX_VALUE
的队列。如果未决任务的数量超过Submit()
,RejectedExecutionException
将抛出Integer.MAX_VALUE
。不确定我们会先耗尽内存还是会发生这种情况。setThreads()
方法改变。submit()
将拒绝每个任务。由于核心线程需要> =最大线程,因此方法setThreads()
也设置了最大线程,尽管最大线程设置对于无界队列是无用的。在您的第一个示例中,后续任务被拒绝,因为AbortPolicy
是默认的RejectedExecutionHandler
。 ThreadPoolExecutor包含以下策略,您可以通过setRejectedExecutionHandler
方法更改这些策略:
CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy
听起来你想要一个CallerRunsPolicy的缓存线程池。
这里的答案都没有解决我的问题,这与使用Apache的HTTP客户端(3.x版本)创建有限数量的HTTP连接有关。由于我花了几个小时来找出一个好的设置,我将分享:
private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
这将创建一个以5开头的ThreadPoolExecutor
,并使用CallerRunsPolicy
执行最多10个同时运行的线程。
根据ThreadPoolExecutor的Javadoc:
如果有多个corePoolSize但运行的maximumPoolSize线程少于maximumPoolSize,则只有在队列已满时才会创建新线程。通过设置corePoolSize和maximumPoolSize,可以创建固定大小的线程池。
(强调我的。)
抖动的答案就是你想要的,虽然我的回答你的另一个问题。 :)
还有一个选择。您也可以使用任何其他队列,而不是使用新的SynchronousQueue,但您必须确保其大小为1,这样才会强制执行器服务创建新线程。
看起来好像没有任何答案实际回答这个问题 - 实际上我看不到这样做的方法 - 即使你从PooledExecutorService继承子类,因为许多方法/属性是私有的,例如使addIfUnderMaximumPoolSize受到保护您可以执行以下操作:
class MyThreadPoolService extends ThreadPoolService {
public void execute(Runnable run) {
if (poolSize() == 0) {
if (addIfUnderMaximumPoolSize(run) != null)
return;
}
super.execute(run);
}
}
我得到的最接近的是 - 但即使这不是一个很好的解决方案
new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
public void execute(Runnable command) {
if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {
super.setCorePoolSize(super.getCorePoolSize() + 1);
}
super.execute(command);
}
protected void afterExecute(Runnable r, Throwable t) {
// nothing in the queue
if (getQueue().isEmpty() && getPoolSize() > min) {
setCorePoolSize(getCorePoolSize() - 1);
}
};
};
附:未经上述测试
这就是你想要的(至少我猜是这样)。有关解释,请查看qazxsw poi
创建一个线程池,该线程池重用在共享的无界队列中运行的固定数量的线程。在任何时候,最多nThreads线程将是活动的处理任务。如果在所有线程都处于活动状态时提交了其他任务,则它们将在队列中等待,直到线程可用。如果任何线程由于在关闭之前执行期间的故障而终止,则在需要执行后续任务时将使用新的线程。池中的线程将一直存在,直到它被明确关闭。