我对
ExecutorService
有以下要求:
原因:任务的生产者可能会压垮执行器服务或队列,如果队列中的任务过多,则会导致内存需求增加。在这种情况下,背压将阻止生产者,直到处理能够赶上。生产者(一些从 Kafka 读取,另一些从 JDBC 读取)可能有可能超时的开放事务,因此新任务的提交(如果阻塞)也应该能够超时。
作为执行器服务实现,例如
ThreadPoolExecutor
,使用 BlockingQueue
将任务排入队列,我本来期望队列实际上具有这样的功能,即仅排队达到定义数量的任务,然后阻塞直到任务由工作线程从队列中拉出。相反,事实证明,当队列已满时,其他任务将被拒绝,并在提交时显示 RejectedExecutionException
。这当然不是我想要的。
我想出了一个
ExecutorService
的包装器,它使用 Semaphore
来控制同时接受多少个任务,这也允许我在获取信号量的所有许可时阻止(超时):
public class Pipeline {
private final ExecutorService executorService;
private final Semaphore semaphore;
private final int queueTimeout;
private final TimeUnit queueTimeoutUnit;
public Pipeline(int maxConcurrent, int queueTimeout, TimeUnit queueTimeoutUnit) {
semaphore = new Semaphore(maxConcurrent);
executorService = Executors.newCachedThreadPool();
this.queueTimeout = queueTimeout;
this.queueTimeoutUnit = queueTimeoutUnit;
}
public <T> Future<T> submit(Callable<T> task) {
try {
boolean acquired = semaphore.tryAcquire(queueTimeout, queueTimeoutUnit);
if (!acquired) {
throw new RuntimeException("Timeout accepting task");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return executorService.submit(() -> {
try {
return task.call();
} finally {
semaphore.release();
}
});
}
public void shutdown() {
executorService.shutdown();
}
}
这确实有效,但似乎是一个足够常见的用例,可能已经在 Java API 中涵盖了。我是否缺少一些内置功能?
如果您创建一个
ThreadPoolExecutor
,并提供固定长度的 ArrayBlockingQueue
和您自己的自定义 RejectedExecutionHandler
,会怎样?
注意!我自己没有尝试过这个。这只是一个想法。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;
class MyHandler implements RejectedExecutionHandler {
final BlockingQueue<Runnable> task_queue;
final long timeout;
final TimeUnit unit;
public MyHandler(
BlockingQueue<Runnable> task_queue,
long timeout,
TimeUnit unit
) {
this.task_queue = task_queue;
this.timeout = timeout;
this.unit = unit;
}
@override
public void rejectedExecution(
Runnable task,
ThreadPoolExecutor pool
) {
boolean timed_out = false;
try {
timed_out = ! task_queue.offer(task, timeout, unit);
}
catch (Exception ex) {
...use your shameful imagination here...
}
if (timed_out) {
throw new RejectedExecutionException("queue is full");
}
}
}
然后,创建执行器:
BlockingQueue<Runnable> task_queue = new ArrayBlockingQueue<>(...);
ExecutorService pool = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize,
keepAliveTime, keepAliveTimeUnit,
task_queue,
new MyHandler(task_queue, submitTimeout, submitTimeoutUnit)
);
这有点 hack-y,因为它弄乱了执行器背后的队列,但我的期望是,如果当你的代码调用
pool.submit(task)
时队列已满,那么处理程序的 rejectedExecution()
方法将被调用。它将等待指定的时间将任务添加到队列中。
如果成功,则任务将添加到队列中,每个人都很高兴。如果失败,则
RejectedExecutionException
将传播回 pool.submit(task)
的调用者以捕获并处理您认为合适的方式。
这是标准库中缺少的东西,也是 LMAX Disruptor 库存在的原因:
https://lmax-exchange.github.io/disruptor/
我的经验是它使并发消息队列变得轻而易举 - 根本不需要线程代码。需要注意的是,最低延迟等待策略需要大量 CPU 资源。