我试图使用ThreadPoolExecutor执行许多任务。以下是一个假设的例子:
def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
for(int i = 0; i < 100000; i++)
threadPoolExecutor.execute(runnable)
问题是我很快就得到了一个java.util.concurrent.RejectedExecutionException
,因为任务数量超过了工作队列的大小。但是,我正在寻找的所需行为是让主线程阻塞,直到队列中有空间。完成此任务的最佳方法是什么?
在一些非常狭窄的情况下,您可以实现执行所需操作的java.util.concurrent.RejectedExecutionHandler。
RejectedExecutionHandler block = new RejectedExecutionHandler() {
rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
executor.getQueue().put( r );
}
};
ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);
现在。由于以下原因,这是一个非常糟糕的主意
一个几乎总是更好的策略是安装ThreadPoolExecutor.CallerRunsPolicy,它将通过在调用execute()的线程上运行任务来限制你的应用程序。
但是,有时候一个具有所有固有风险的阻止策略实际上就是你想要的。我会说在这些条件下
所以,正如我所说。它很少需要并且可能很危险,但是你去了。
祝好运。
您可以使用semaphore
来阻止线程进入池中。
ExecutorService service = new ThreadPoolExecutor(
3,
3,
1,
TimeUnit.HOURS,
new ArrayBlockingQueue<>(6, false)
);
Semaphore lock = new Semaphore(6); // equal to queue capacity
for (int i = 0; i < 100000; i++ ) {
try {
lock.acquire();
service.submit(() -> {
try {
task.run();
} finally {
lock.release();
}
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
一些陷阱:
您需要做的是将ThreadPoolExecutor包装到Executor中,它明确限制其中并发执行的操作量:
private static class BlockingExecutor implements Executor {
final Semaphore semaphore;
final Executor delegate;
private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) {
semaphore = new Semaphore(concurrentTasksLimit);
this.delegate = delegate;
}
@Override
public void execute(final Runnable command) {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
final Runnable wrapped = () -> {
try {
command.run();
} finally {
semaphore.release();
}
};
delegate.execute(wrapped);
}
}
您可以将concurrentTasksLimit调整为委托执行程序的threadPoolSize + queueSize,它几乎可以解决您的问题
这是我在这种情况下的代码片段:
public void executeBlocking( Runnable command ) {
if ( threadPool == null ) {
logger.error( "Thread pool '{}' not initialized.", threadPoolName );
return;
}
ThreadPool threadPoolMonitor = this;
boolean accepted = false;
do {
try {
threadPool.execute( new Runnable() {
@Override
public void run() {
try {
command.run();
}
// to make sure that the monitor is freed on exit
finally {
// Notify all the threads waiting for the resource, if any.
synchronized ( threadPoolMonitor ) {
threadPoolMonitor.notifyAll();
}
}
}
} );
accepted = true;
}
catch ( RejectedExecutionException e ) {
// Thread pool is full
try {
// Block until one of the threads finishes its job and exits.
synchronized ( threadPoolMonitor ) {
threadPoolMonitor.wait();
}
}
catch ( InterruptedException ignored ) {
// return immediately
break;
}
}
} while ( !accepted );
}
threadPool是java.util.concurrent.ExecutorService的本地实例,已经初始化。
这就是我最终做的事情:
int NUM_THREADS = 6;
Semaphore lock = new Semaphore(NUM_THREADS);
ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 100000; i++) {
try {
lock.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
pool.execute(() -> {
try {
// Task logic
} finally {
lock.release();
}
});
}
我使用自定义的RejectedExecutionHandler解决了这个问题,它只是暂时阻塞调用线程,然后再次尝试提交任务:
public class BlockWhenQueueFull implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// The pool is full. Wait, then try again.
try {
long waitMs = 250;
Thread.sleep(waitMs);
} catch (InterruptedException interruptedException) {}
executor.execute(r);
}
}
这个类只能在线程池执行器中用作RejectedExecutionHandler,就像任何其他类一样。在这个例子中:
executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull())
我看到的唯一缺点是调用线程可能会被锁定比严格必要的时间长(最长250毫秒)。对于许多短期运行的任务,可能会将等待时间减少到10ms左右。此外,由于这个执行器被有效地递归调用,因此很长时间等待线程可用(小时)可能导致堆栈溢出。
不过,我个人喜欢这种方法。它结构紧凑,易于理解,运行良好。我错过了什么重要的事吗?