用 Executor 替换 BlockingQueue + 守护线程

问题描述 投票:0回答:1

我有一个带有任务的

BlockingQueue
,以及执行它们的单个守护线程:

public class TaskManager {

  private final BlockingQueue<Task> taskQueue;

  public TaskManager() {
    this.taskQueue = new ArrayBlockingQueue<>(5); //Max 5 tasks waiting
    Thread taskRunner = new Thread(this::runTask);
    taskRunner.setDaemon(true); //Infinite loop inside shouldn't prevent JVM shutdown
    taskRunner.start();
  }

  //Called async
  public void enqueueTask(Task task) {
    boolean added = false;
    try {
        added = taskQueue.offer(task, 5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {/*no-op*/}
    if (!added) {
        publishEvent(new Fail(task)); //Async notify the client their task won't run
    }
  }

  //Also called async
  public void dequeueTask(Task task) {
    this.taskQueue.remove(task);
  }

  void runTask() {
    while (true) {
      try {
        Task task = taskQueue.take(); //Blocks indefinitely
        doStuff(task);
      } catch (InterruptedException e) {
          //Can only be a spurious interruption. Ignore.
      }
    }
  }
}

看起来并没有什么特别的。但。感觉就像我正在通过自己管理队列和守护进程来重新发明轮子。我希望用

Executor(Service)
可以轻松做到这一点。我可以有一个带有 1 个线程的
ThreadPoolExecutor
,给它一个有界队列。它为我提供了一个
remove
方法来使任务出队。但如果排队时间太长,似乎没有办法超时(在我的示例中,
taskQueue.offer()
超时 5 秒)。

我所做的事情是否已经合法,只是我想太多了?如果没有,我可以用

Executor
替换它吗?如何替换?

java multithreading concurrency queue
1个回答
0
投票

重要的是要理解,

ThreadPoolExecutor
永远不会等待队列变得“未满”。它使用
offer
,如果容量耗尽,它会立即返回
false
,如果发生这种情况并且已达到配置的最大线程大小,它将调用
RejectedExecutionHandler

因此,您可以将最大线程数指定为 1 并使用以下内容

RejectedExecutionHandler
:

RejectedExecutionHandler reh = (r, es) -> {
    boolean added = false;
    try {
        BlockingQueue<Runnable> taskQueue = ((ThreadPoolExecutor)es).getQueue();
        added = taskQueue.offer(r, 5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {/*no-op*/}
    if(!added) {
        //assuming the Runnable and "Task" are interchangeable
        publishEvent(new Fail(r));
    }
};

使用构造函数

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

© www.soinside.com 2019 - 2024. All rights reserved.