我有一个应用程序,其中有多个线程从 jms 目标读取消息。侦听器线程读取消息,对其进行一些更改并调用不同类的其他几个方法。这些方法用
@Async
注释进行注释,所有方法都使用自定义 ThreadPoolTaskExecutor
并行执行。
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setTaskDecorator(new LoggingTaskDecorator());
executor.initialize();
return executor;
}
到目前为止,所有消息都被认为具有同等优先级,一切都很好,因为如果没有任何
LinkedBlockingQueue
线程可用,所有消息都会进入 Executor
。
现在,有一个要求,即从队列中读取的特定类型的消息预计比从队列中读取的任何其他消息具有更高的优先级。
目前,我正在使用“org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor”,它没有提供任何可以将优先级队列设置为阻塞队列实现的方法。
您能帮我解决这个问题吗? 或者说现有的系统设计无法适应这种变化? 或者处理这种情况的最佳解决方案是什么?
谢谢!
只需重写
createQueue
方法即可。另外,您应该使用 @Bean
方法来创建 bean 的实例,这样 Spring 可以正确管理生命周期,这是一件小而重要的事情(否则关闭将无法正常工作)..
@Override
public Executor getAsyncExecutor() {
return taskExecutor();
}
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() {
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
return new PriorityBlockingQueue<>(queueCapacity);
}
};
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setTaskDecorator(new LoggingTaskDecorator());
return executor;
}
这样的东西应该有效。
createQueue
方法现在创建一个 PriorityBlockingQueue
而不是默认的 LinkedBlockingQueue
。
据我所知,不可能结合
@Async
来确定任务的优先级。 (前面的答案只展示了如何设置一个带有优先级队列的TaskExecutor
,但没有展示如何指定优先级。)
相反,我会使用明确的
Executor
:
executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new PriorityBlockingQueue<>());
然后,定义一个向此执行器提交合适任务的常规方法,而不是
@Async
方法:
var task = new CustomTask(/* stuff needed to run the task and determine the priority */);
executor.executeTask(prioritizedTask);
// if the method is supposed to return a Future:
return task;
最后一块是实现
CustomTask
和 Runnable
的 Comparable
类。如果该方法应该返回 Future
,则该类应该实现 RunnableFuture
而不是仅仅实现 Runnable
。一个简单的实现可以继承自 FutureTask
:
class CustomTask extends FutureTask</* result type */> implements Comparable<CustomTask> {
CustomTask(/* stuff */) {
super(() -> /* do work */);
}
public int compareTo(CustomTask other) {
/* return -1 if this task has a higher priority than the other */
}
}