使用ScheduledThreadPoolExecutor建立有界队列的最佳方法是什么?

问题描述 投票:14回答:5

Sun Java(1.6)ScheduledThreadPoolExecutorThreadPoolExecutor的扩展,内部使用DelayQueue的实现,这是一个无界的队列。我需要的是一个带有有界队列的ScheduledThreadpoolExecutor,即它对队列中累积的任务有一个限制,这样当队列中的任务超过限制时,它开始拒绝进一步提交的任务并防止JVM内存不足。

令人惊讶的是,谷歌或stackoverflow没有指出我正在讨论这个问题的任何结果。有没有这样的东西可用我错过了?如果没有,我如何实现ScheduledThreadpoolExecutor以最佳方式提供我期望的功能?

java concurrency scheduled-tasks threadpool
5个回答
5
投票

正如其他人已经指出的那样,没有一种方法可以做到这一点。只要确保你尝试使用“组合”而不是“继承”。创建一个新类,通过根据必要方法的要求进行检查,实现必要的接口并委托给底层的ScheduledThreadPoolExecutor

您还可以使用this thread中指定的技术进行简单修改。您可以使用Semaphore#acquire而不是使用Semaphore#tryAcquire,并根据布尔结果决定是否需要调用拒绝处理程序。想一想,我个人认为,对于图书馆作者来说,直接将特定执行者子类化,而不是依靠组合来创建一个正常执行者的“可调度”包装,这是一种疏忽。


2
投票

如何以不同方式处理它,即根据队列大小延迟任务提交。执行程序服务通过getQueue()公开队列。您可以调用它上面的size(),并根据您为队列大小规划的限制,您可以开始拒绝任务或开始延迟任务执行(增加计划时间,保持队列大小作为因素之一) )。

总而言之,这也不是最好的解决方案;只是fyi,java提供延迟队列来支持工作窃取。


1
投票

最简单的解决方法是使用预定执行程序仅调度任务,而不是实际执行它们。如果执行程序队列高于阈值,则调度程序必须显式检查执行程序队列大小并丢弃任务。

另一个选项是在计划任务中检查ScheduledThreadPoolExecutor队列大小。如果队列高于阈值,则立即返回。在这种情况下,任务将立即执行并从队列中删除。所以溢出不会发生。


1
投票

ScheduledThreadPoolExecutor不使用队列作为字段,而是调用getQueue。但它调用super.getQueue,它是ThreadPoolExecutor的队列。您可以使用反射覆盖它,如下所示:

public class BoundedScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
  public BoundedScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler, int queueCapacity) {
    super(corePoolSize, handler);
    setMaximumPoolSize(corePoolSize);
    setKeepAliveTime(0, TimeUnit.MILLISECONDS);
    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(queueCapacity) {
            @Override
            public boolean add(Runnable r) {
                boolean added = offer(r);
                if (added) {
                    return added;
                } else {
                    getRejectedExecutionHandler().rejectedExecution(r, CrashingThreadPoolExecutor.this);
                    return false;
                }
            }
        };

    try {
        Field workQueueField = ThreadPoolExecutor.class.getDeclaredField("workQueue");
        workQueueField.setAccessible(true);
        workQueueField.set(this, queue);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
  }
}

0
投票

如果你真的,真的不想重新实现ScheduledThreadPoolExecutor那么你可以扩展它并覆盖所有schedule*方法并实现你自己的任务边界。但这会很糟糕:

private final Object scheduleMonitor = new Object();

@Override
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) 
{
    if (command == null || unit == null)
        throw new NullPointerException();

    synchronized (scheduleMonitor)
    {                
        while (getQueue().size() >= MAX_QUEUE_SIZE)
        {
           scheduleMonitor.wait();
        }
        super.schedule(command, delay, unit);
    }
}

@Override
Runnable getTask() 
{
    final Runnable r = getTask();
    synchronized (scheduleMonitor)
    {
        scheduleMonitor.notify();
    }
    return r;
}

并重复:

  • public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
  • public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
  • public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

请注意,这不会阻止重复任务使队列超过限制,它只会阻止新计划的任务。

另一个警告是,我没有通过在super.schedule上持有锁定时调用scheduleMonitor来检查任何死锁问题...

© www.soinside.com 2019 - 2024. All rights reserved.