我有一个带有任务的
BlockingQueue
,以及执行它们的单个守护线程:
public class TaskManager {
private final BlockingQueue<Task> taskQueue;
public TaskManager() {
this.taskQueue = new ArrayBlockingQueue<>(5); //Max 5 tasks waiting
Thread taskRunner = new Thread(this::runTask);
taskRunner.setDaemon(true); //Infinite loop inside shouldn't prevent JVM shutdown
taskRunner.start();
}
//Called async
public void enqueueTask(Task task) {
boolean added = false;
try {
added = taskQueue.offer(task, 5, TimeUnit.SECONDS);
} catch (InterruptedException e) {/*no-op*/}
if (!added) {
publishEvent(new Fail(task)); //Async notify the client their task won't run
}
}
//Also called async
public void dequeueTask(Task task) {
this.taskQueue.remove(task);
}
void runTask() {
while (true) {
try {
Task task = taskQueue.take(); //Blocks indefinitely
doStuff(task);
} catch (InterruptedException e) {
//Can only be a spurious interruption. Ignore.
}
}
}
}
看起来并没有什么特别的。但。感觉就像我正在通过自己管理队列和守护进程来重新发明轮子。我希望用
Executor(Service)
可以轻松做到这一点。我可以有一个带有 1 个线程的 ThreadPoolExecutor
,给它一个有界队列。它为我提供了一个 remove
方法来使任务出队。但如果排队时间太长,似乎没有办法超时(在我的示例中,taskQueue.offer()
超时 5 秒)。
我所做的事情是否已经合法,只是我想太多了?如果没有,我可以用
Executor
替换它吗?如何替换?
重要的是要理解,
ThreadPoolExecutor
永远不会等待队列变得“未满”。它使用 offer
,如果容量耗尽,它会立即返回 false
,如果发生这种情况并且已达到配置的最大线程大小,它将调用 RejectedExecutionHandler
。
RejectedExecutionHandler
:
RejectedExecutionHandler reh = (r, es) -> {
boolean added = false;
try {
BlockingQueue<Runnable> taskQueue = ((ThreadPoolExecutor)es).getQueue();
added = taskQueue.offer(r, 5, TimeUnit.SECONDS);
} catch (InterruptedException e) {/*no-op*/}
if(!added) {
//assuming the Runnable and "Task" are interchangeable
publishEvent(new Fail(r));
}
};
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
。