我有一个工作单位列表,我想并行处理它们。每个单元工作时间为 8-15 秒,完全计算时间,无 I/O 阻塞。我想要实现的是拥有一个
ExecutorService
:
类似:
Queue<WorkResult> queue = new ConcurrentLinkedDeque<>();
ExecutorService service = ....
for(WorkUnit unit : list) {
service.submit(() -> {
.. do some work ..
queue.offer(result);
);
}
while(queue.peek() != null) {
... process results while they arrive ...
}
我尝试过但没有成功的是:
newCachedThreadPool()
会创建太多线程new ThreadPoolExecutor(0, 20, 60L, SECONDS, new SynchronousQueue<>())
,但后来我注意到由于同步队列,submit() 被阻塞了new LinkedBlockingQueue()
,只是为了发现 ThreadPoolExecutor 只生成一个线程我确信有官方的实现来处理这个非常基本的并发用例。 有人可以建议吗?
LinkedBlockingQueue
创建 20
作为 corePoolSize
(构造函数中的第一个参数):
new ThreadPoolExecutor(20, 20, 60L, SECONDS, new LinkedBlockingQueue<>());
LinkedBlockingQueue
而没有预定义容量,则 Pool
:
maxPoolSize
。corePoolSize
指定数量更多的线程。在您的情况下,只会执行一个线程。你很幸运能得到一个,因为你将它设置为
0
和 以前版本的 Java(如果 <1.6)corePoolSize
设置为 0
, 就不会创建任何内容(他们怎么敢?) .
其他版本确实会创建一个新线程,即使
corePoolSize
是0
,这看起来像……修复是……一个bug,……改变了……逻辑行为? .
使用无界队列(例如没有队列的
) 预定义容量)将导致新任务在队列中等待 所有 corePoolSize 线程都忙。因此,不超过 corePoolSize 线程将永远被创建。 (以及maximumPoolSize的值 因此没有任何效果。)LinkedBlockingQueue
关于缩小规模
为了在没有工作要做时删除所有线程,您必须专门关闭
coreThreads
(默认情况下它们不会终止)。要实现此目的,请在启动 allowCoreThreadTimeOut(true)
之前设置
Pool
。
注意设置正确的
keep-alive
超时:例如,如果平均每6秒收到一个新任务,将保持活动时间设置为5秒可能导致不必要的擦除+创建操作( 哦亲爱的线程,你只需要等一秒钟!)。根据任务接收收入速度来设置这个超时时间。
allowCoreThreadTimeOut
设置控制核心线程是否超时的策略 如果在保活时间内没有任务到达则终止,即 当新任务到达时,如果需要则更换。当 false 时,核心线程是 从未因缺少传入任务而终止。 当为真时,同样 适用于非核心线程的保持活动策略也适用于核心线程 线程。为了避免不断的线程替换,保持活动时间 设置 true 时必须大于零。这个方法应该在 一般在池被主动使用之前调用。
TL/DR
LinkedBloquingQueue
作为任务队列。corePoolSize
取代 maxPoolSize
的 含义。allowCoreThreadTimeOut(true)
,以便允许 Pool
缩小规模 使用基于超时的机制,该机制也会影响 coreThreads
。keep-alive
值设置为基于任务接收延迟的逻辑值。这个新鲜组合将导致
ExecutorService
,99,99999%的时间不会阻止提交者(要发生这种情况,排队的任务数量应该是2.147.483.647 ),这可以有效地缩放工作负载基础上的线程数量,在 { 0 <--> corePoolSize }
并发线程之间波动(双向)。
建议,应该监控队列的大小,因为非阻塞行为是有代价的:如果队列在不受控制的情况下持续增长,则获得
OOM
异常的概率,直到满足 INTEGER.MAX_VALUE
(f.e:如果线程僵持了一整天,提交者不断插入任务)。 即使任务在内存中的大小可能很小,2.147.483.647 个对象及其相应的链接包装器等......也是很多额外的负载。
最简单的方法就是使用方法
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
属于执行者类。这为您提供了一个简单的开箱即用的解决方案。您获得的池将根据需要扩大和缩小。您可以使用处理核心线程超时等方法进一步配置它。
ScheduledExecutorService 是 ExecutorService 类的扩展,并且是唯一开箱即用的可以动态扩展和收缩的类。