队列已满时ThreadPoolExecutor阻塞?

问题描述 投票:49回答:6

我试图使用ThreadPoolExecutor执行许多任务。以下是一个假设的例子:

def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
for(int i = 0; i < 100000; i++)
    threadPoolExecutor.execute(runnable)

问题是我很快就得到了一个java.util.concurrent.RejectedExecutionException,因为任务数量超过了工作队列的大小。但是,我正在寻找的所需行为是让主线程阻塞,直到队列中有空间。完成此任务的最佳方法是什么?

java multithreading concurrency executorservice executor
6个回答
60
投票

在一些非常狭窄的情况下,您可以实现执行所需操作的java.util.concurrent.RejectedExecutionHandler。

RejectedExecutionHandler block = new RejectedExecutionHandler() {
  rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
     executor.getQueue().put( r );
  }
};

ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);

现在。由于以下原因,这是一个非常糟糕的主意

  • 它很容易出现死锁,因为池中的所有线程都可能在您放入队列中的事物可见之前死亡。通过设置合理的保持活动时间来缓解此问题。
  • 任务没有像Executor所期望的那样包装。许多执行程序实现在执行之前将其任务包装在某种跟踪对象中。看看你的来源。
  • API强烈建议不要通过getQueue()添加,并且可能在某些时候被禁止。

一个几乎总是更好的策略是安装ThreadPoolExecutor.CallerRunsPolicy,它将通过在调用execute()的线程上运行任务来限制你的应用程序。

但是,有时候一个具有所有固有风险的阻止策略实际上就是你想要的。我会说在这些条件下

  • 你只有一个线程调用execute()
  • 您必须(或想要)拥有非常小的队列长度
  • 你绝对需要限制运行这项工作的线程数(通常是出于外部原因),并且调用者运行策略会破坏它。
  • 您的任务大小不可预测,因此如果池暂时忙于执行4个短任务并且您的一个线程调用执行程序遇到大问题,则调用者运行可能会引入饥饿。

所以,正如我所说。它很少需要并且可能很危险,但是你去了。

祝好运。


5
投票

您可以使用semaphore来阻止线程进入池中。

ExecutorService service = new ThreadPoolExecutor(
    3, 
    3, 
    1, 
    TimeUnit.HOURS, 
    new ArrayBlockingQueue<>(6, false)
);

Semaphore lock = new Semaphore(6); // equal to queue capacity

for (int i = 0; i < 100000; i++ ) {
    try {
        lock.acquire();
        service.submit(() -> {
            try {
              task.run();
            } finally {
              lock.release();
            }
        });
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

一些陷阱:

  • 仅将此模式与固定线程池一起使用。队列不太可能经常填满,因此不会创建新线程。查看ThreadPoolExecutor上的java文档以获取更多详细信息:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html有一种解决方法,但它超出了这个答案的范围。
  • 队列大小应该高于核心线程的数量。如果我们要使队列大小为3,最终会发生什么: T0:所有三个线程都在工作,队列为空,没有许可证可用。 T1:线程1完成,释放许可证。 T2:线程1轮询队列以获取新工作,找不到,并等待。 T3:主线程将工作提交到池中,线程1开始工作。 上面的例子转换为线程主线程阻塞线程1.它可能看起来像一个小周期,但现在将频率乘以天和月。突然之间,短时间内浪费了大量的时间。

5
投票

您需要做的是将ThreadPoolExecutor包装到Executor中,它明确限制其中并发执行的操作量:

 private static class BlockingExecutor implements Executor {

    final Semaphore semaphore;
    final Executor delegate;

    private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) {
        semaphore = new Semaphore(concurrentTasksLimit);
        this.delegate = delegate;
    }

    @Override
    public void execute(final Runnable command) {
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
            return;
        }

        final Runnable wrapped = () -> {
            try {
                command.run();
            } finally {
                semaphore.release();
            }
        };

        delegate.execute(wrapped);

    }
}

您可以将concurrentTasksLimit调整为委托执行程序的threadPoolSize + queueSize,它几乎可以解决您的问题


1
投票

这是我在这种情况下的代码片段:

public void executeBlocking( Runnable command ) {
    if ( threadPool == null ) {
        logger.error( "Thread pool '{}' not initialized.", threadPoolName );
        return;
    }
    ThreadPool threadPoolMonitor = this;
    boolean accepted = false;
    do {
        try {
            threadPool.execute( new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    }
                    // to make sure that the monitor is freed on exit
                    finally {
                        // Notify all the threads waiting for the resource, if any.
                        synchronized ( threadPoolMonitor ) {
                            threadPoolMonitor.notifyAll();
                        }
                    }
                }
            } );
            accepted = true;
        }
        catch ( RejectedExecutionException e ) {
            // Thread pool is full
            try {
                // Block until one of the threads finishes its job and exits.
                synchronized ( threadPoolMonitor ) {
                    threadPoolMonitor.wait();
                }
            }
            catch ( InterruptedException ignored ) {
                // return immediately
                break;
            }
        }
    } while ( !accepted );
}

threadPool是java.util.concurrent.ExecutorService的本地实例,已经初始化。


1
投票

这就是我最终做的事情:

int NUM_THREADS = 6;
Semaphore lock = new Semaphore(NUM_THREADS);
ExecutorService pool = Executors.newCachedThreadPool();

for (int i = 0; i < 100000; i++) {
    try {
        lock.acquire();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    pool.execute(() -> {
        try {
            // Task logic
        } finally {
            lock.release();
        }
    });
}

0
投票

我使用自定义的RejectedExecutionHandler解决了这个问题,它只是暂时阻塞调用线程,然后再次尝试提交任务:

public class BlockWhenQueueFull implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

        // The pool is full. Wait, then try again.
        try {
            long waitMs = 250;
            Thread.sleep(waitMs);
        } catch (InterruptedException interruptedException) {}

        executor.execute(r);
    }
}

这个类只能在线程池执行器中用作RejectedExecutionHandler,就像任何其他类一样。在这个例子中:

executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull())

我看到的唯一缺点是调用线程可能会被锁定比严格必要的时间长(最长250毫秒)。对于许多短期运行的任务,可能会将等待时间减少到10ms左右。此外,由于这个执行器被有效地递归调用,因此很长时间等待线程可用(小时)可能导致堆栈溢出。

不过,我个人喜欢这种方法。它结构紧凑,易于理解,运行良好。我错过了什么重要的事吗?

© www.soinside.com 2019 - 2024. All rights reserved.