Sun Java(1.6)ScheduledThreadPoolExecutor
是ThreadPoolExecutor
的扩展,内部使用DelayQueue
的实现,这是一个无界的队列。我需要的是一个带有有界队列的ScheduledThreadpoolExecutor
,即它对队列中累积的任务有一个限制,这样当队列中的任务超过限制时,它开始拒绝进一步提交的任务并防止JVM内存不足。
令人惊讶的是,谷歌或stackoverflow没有指出我正在讨论这个问题的任何结果。有没有这样的东西可用我错过了?如果没有,我如何实现ScheduledThreadpoolExecutor以最佳方式提供我期望的功能?
正如其他人已经指出的那样,没有一种方法可以做到这一点。只要确保你尝试使用“组合”而不是“继承”。创建一个新类,通过根据必要方法的要求进行检查,实现必要的接口并委托给底层的ScheduledThreadPoolExecutor
。
您还可以使用this thread中指定的技术进行简单修改。您可以使用Semaphore#acquire
而不是使用Semaphore#tryAcquire
,并根据布尔结果决定是否需要调用拒绝处理程序。想一想,我个人认为,对于图书馆作者来说,直接将特定执行者子类化,而不是依靠组合来创建一个正常执行者的“可调度”包装,这是一种疏忽。
如何以不同方式处理它,即根据队列大小延迟任务提交。执行程序服务通过getQueue()公开队列。您可以调用它上面的size(),并根据您为队列大小规划的限制,您可以开始拒绝任务或开始延迟任务执行(增加计划时间,保持队列大小作为因素之一) )。
总而言之,这也不是最好的解决方案;只是fyi,java提供延迟队列来支持工作窃取。
最简单的解决方法是使用预定执行程序仅调度任务,而不是实际执行它们。如果执行程序队列高于阈值,则调度程序必须显式检查执行程序队列大小并丢弃任务。
另一个选项是在计划任务中检查ScheduledThreadPoolExecutor队列大小。如果队列高于阈值,则立即返回。在这种情况下,任务将立即执行并从队列中删除。所以溢出不会发生。
ScheduledThreadPoolExecutor不使用队列作为字段,而是调用getQueue。但它调用super.getQueue,它是ThreadPoolExecutor的队列。您可以使用反射覆盖它,如下所示:
public class BoundedScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
public BoundedScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler, int queueCapacity) {
super(corePoolSize, handler);
setMaximumPoolSize(corePoolSize);
setKeepAliveTime(0, TimeUnit.MILLISECONDS);
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(queueCapacity) {
@Override
public boolean add(Runnable r) {
boolean added = offer(r);
if (added) {
return added;
} else {
getRejectedExecutionHandler().rejectedExecution(r, CrashingThreadPoolExecutor.this);
return false;
}
}
};
try {
Field workQueueField = ThreadPoolExecutor.class.getDeclaredField("workQueue");
workQueueField.setAccessible(true);
workQueueField.set(this, queue);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
如果你真的,真的不想重新实现ScheduledThreadPoolExecutor
那么你可以扩展它并覆盖所有schedule*
方法并实现你自己的任务边界。但这会很糟糕:
private final Object scheduleMonitor = new Object();
@Override
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit)
{
if (command == null || unit == null)
throw new NullPointerException();
synchronized (scheduleMonitor)
{
while (getQueue().size() >= MAX_QUEUE_SIZE)
{
scheduleMonitor.wait();
}
super.schedule(command, delay, unit);
}
}
@Override
Runnable getTask()
{
final Runnable r = getTask();
synchronized (scheduleMonitor)
{
scheduleMonitor.notify();
}
return r;
}
并重复:
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit)
请注意,这不会阻止重复任务使队列超过限制,它只会阻止新计划的任务。
另一个警告是,我没有通过在super.schedule
上持有锁定时调用scheduleMonitor
来检查任何死锁问题...