动态缩放线程数量的ExecutorService

问题描述 投票:0回答:2

我有一个工作单位列表,我想并行处理它们。每个单元工作时间为 8-15 秒,完全计算时间,无 I/O 阻塞。我想要实现的是拥有一个

ExecutorService

  • 当没有工作要做时,实例化零个线程
  • 如果需要,可以动态扩展到 20 个线程
  • 允许我一次添加所有工作单位(不阻止提交)

类似:

Queue<WorkResult> queue = new ConcurrentLinkedDeque<>();
ExecutorService service = ....
for(WorkUnit unit : list) {
    service.submit(() -> {
        .. do some work ..
        queue.offer(result);
    );
}
while(queue.peek() != null) {
    ... process results while they arrive ...
}

我尝试过但没有成功的是:

  • 使用
    newCachedThreadPool()
    会创建太多线程
  • 然后我使用了它的内部调用
    new ThreadPoolExecutor(0, 20, 60L, SECONDS, new SynchronousQueue<>())
    ,但后来我注意到由于同步队列,submit() 被阻塞了
  • 所以我使用了
    new LinkedBlockingQueue()
    ,只是为了发现 ThreadPoolExecutor 只生成一个线程

我确信有官方的实现来处理这个非常基本的并发用例。 有人可以建议吗?

java multithreading java.util.concurrent
2个回答
7
投票

使用 ThreadPoolExecutor

LinkedBlockingQueue
创建
20
 作为 
corePoolSize
构造函数中的第一个参数):

new ThreadPoolExecutor(20, 20, 60L, SECONDS, new LinkedBlockingQueue<>());


如果您使用

LinkedBlockingQueue
而没有预定义容量,则
Pool

  • 不会检查
    maxPoolSize
  • 不会创建比
    corePoolSize
    指定数量更多的线程。

在您的情况下,只会执行一个线程。你很幸运能得到一个,因为你将它设置为

0
以前版本的 Java(如果 <1.6)corePoolSize 设置为
0
 
就不会创建任何内容(他们怎么敢?) .

其他版本确实会创建一个新线程,即使

corePoolSize
0
,这看起来像……修复是……一个bug,……改变了……逻辑行为? .

线程池执行器

使用无界队列(例如没有队列的

LinkedBlockingQueue
) 预定义容量)将导致新任务在队列中等待 所有 corePoolSize 线程都忙。因此,不超过 corePoolSize 线程将永远被创建(以及maximumPoolSize的值 因此没有任何效果。)


关于缩小规模

为了在没有工作要做时删除所有线程,您必须专门关闭

coreThreads
默认情况下它们不会终止)。要实现此目的,请在启动 allowCoreThreadTimeOut(true)
 之前设置 
Pool

注意设置正确的

keep-alive
超时:例如,如果平均每6秒收到一个新任务,将保持活动时间设置为5秒可能导致不必要的擦除+创建操作( 哦亲爱的线程,你只需要等一秒钟!)。根据任务接收收入速度来设置这个超时时间。

allowCoreThreadTimeOut

设置控制核心线程是否超时的策略 如果在保活时间内没有任务到达则终止,即 当新任务到达时,如果需要则更换。当 false 时,核心线程是 从未因缺少传入任务而终止。 当为真时,同样 适用于非核心线程的保持活动策略也适用于核心线程 线程。为了避免不断的线程替换,保持活动时间 设置 true 时必须大于零。这个方法应该在 一般在池被主动使用之前调用。


TL/DR

  • 无界
    LinkedBloquingQueue
    作为任务队列。
  • corePoolSize
    取代
    maxPoolSize
    含义
  • allowCoreThreadTimeOut(true)
    ,以便允许
    Pool
    缩小规模 使用基于超时的机制,该机制也会影响
    coreThreads
  • keep-alive
    值设置为基于任务接收延迟的逻辑值

这个新鲜组合将导致

ExecutorService
,99,99999%的时间不会阻止提交者(要发生这种情况,排队的任务数量应该是2.147.483.647 ),这可以有效地缩放工作负载基础上的线程数量,在 { 0 <--> corePoolSize }
并发线程之间波动
(双向)

建议,应该监控队列的大小,因为非阻塞行为是有代价的:如果队列在不受控制的情况下持续增长,则获得

OOM
异常的概率,直到满足
INTEGER.MAX_VALUE
(f.e:如果线程僵持了一整天,提交者不断插入任务)即使任务在内存中的大小可能很小,2.147.483.647 个对象及其相应的链接包装器等......也是很多额外的负载。


4
投票

最简单的方法就是使用方法

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

属于执行者类。这为您提供了一个简单的开箱即用的解决方案。您获得的池将根据需要扩大和缩小。您可以使用处理核心线程超时等方法进一步配置它。
ScheduledExecutorService 是 ExecutorService 类的扩展,并且是唯一开箱即用的可以动态扩展和收缩的类。

© www.soinside.com 2019 - 2024. All rights reserved.