带有背压的ExecutorService

问题描述 投票:0回答:2

我对

ExecutorService
有以下要求:

  • 并发处理任务数量有限(线程),最好可配置
  • 当所有线程都被占用时,后续提交的任务将被排队
  • 队列应应用背压:如果提交的任务过多且队列已满,则提交应阻塞
  • 阻塞提交可以有一个超时:如果在给定时间内未提交任务,则应抛出异常。

原因:任务的生产者可能会压垮执行器服务或队列,如果队列中的任务过多,则会导致内存需求增加。在这种情况下,背压将阻止生产者,直到处理能够赶上。生产者(一些从 Kafka 读取,另一些从 JDBC 读取)可能有可能超时的开放事务,因此新任务的提交(如果阻塞)也应该能够超时。

作为执行器服务实现,例如

ThreadPoolExecutor
,使用
BlockingQueue
将任务排入队列,我本来期望队列实际上具有这样的功能,即仅排队达到定义数量的任务,然后阻塞直到任务由工作线程从队列中拉出。相反,事实证明,当队列已满时,其他任务将被拒绝,并在提交时显示
RejectedExecutionException
。这当然不是我想要的。

我想出了一个

ExecutorService
的包装器,它使用
Semaphore
来控制同时接受多少个任务,这也允许我在获取信号量的所有许可时阻止(超时):

public class Pipeline {

    private final ExecutorService executorService;
    private final Semaphore semaphore;

    private final int queueTimeout;
    private final TimeUnit queueTimeoutUnit;

    public Pipeline(int maxConcurrent, int queueTimeout, TimeUnit queueTimeoutUnit) {
        semaphore = new Semaphore(maxConcurrent);
        executorService = Executors.newCachedThreadPool();

        this.queueTimeout = queueTimeout;
        this.queueTimeoutUnit = queueTimeoutUnit;
    }

    public <T> Future<T> submit(Callable<T> task) {
        try {
            boolean acquired = semaphore.tryAcquire(queueTimeout, queueTimeoutUnit);
            if (!acquired) {
                throw new RuntimeException("Timeout accepting task");
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return executorService.submit(() -> {
            try {
                return task.call();
            } finally {
                semaphore.release();
            }
        });
    }

    public void shutdown() {
        executorService.shutdown();
    }
}

这确实有效,但似乎是一个足够常见的用例,可能已经在 Java API 中涵盖了。我是否缺少一些内置功能?

java concurrency semaphore executorservice java.util.concurrent
2个回答
1
投票

如果您创建一个

ThreadPoolExecutor
,并提供固定长度的
ArrayBlockingQueue
和您自己的自定义
RejectedExecutionHandler
,会怎样?

注意!我自己没有尝试过这个。这只是一个想法。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;

class MyHandler implements RejectedExecutionHandler {
    final BlockingQueue<Runnable> task_queue;
    final long timeout;
    final TimeUnit unit;

    public MyHandler(
        BlockingQueue<Runnable> task_queue,
        long timeout,
        TimeUnit unit
    ) {
        this.task_queue = task_queue;
        this.timeout = timeout;
        this.unit = unit;
    }

    @override
    public void rejectedExecution(
        Runnable task,
        ThreadPoolExecutor pool
    ) {
        boolean timed_out = false;
        try {
            timed_out = ! task_queue.offer(task, timeout, unit);
        }
        catch (Exception ex) {
            ...use your shameful imagination here...
        }
        if (timed_out) {
            throw new RejectedExecutionException("queue is full");
        }
    }
}

然后,创建执行器:

BlockingQueue<Runnable> task_queue = new ArrayBlockingQueue<>(...);
ExecutorService pool = new ThreadPoolExecutor(
    corePoolSize,  maximumPoolSize,
    keepAliveTime, keepAliveTimeUnit,
    task_queue,
    new MyHandler(task_queue, submitTimeout, submitTimeoutUnit)
);

这有点 hack-y,因为它弄乱了执行器背后的队列,但我的期望是,如果当你的代码调用

pool.submit(task)
时队列已满,那么处理程序的
rejectedExecution()
方法将被调用。它将等待指定的时间将任务添加到队列中。

如果成功,则任务将添加到队列中,每个人都很高兴。如果失败,则

RejectedExecutionException
将传播回
pool.submit(task)
的调用者以捕获并处理您认为合适的方式。


0
投票

这是标准库中缺少的东西,也是 LMAX Disruptor 库存在的原因:

https://lmax-exchange.github.io/disruptor/

我的经验是它使并发消息队列变得轻而易举 - 根本不需要线程代码。需要注意的是,最低延迟等待策略需要大量 CPU 资源。

© www.soinside.com 2019 - 2024. All rights reserved.