当我调用异步方法并使用 ThreadPoolExecutorService 时,收到以下错误。
@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(10);
executor.setThreadNamePrefix("SyncThread");
executor.initialize();
return executor;
}
@RequestMapping(value = "/coupons/bulk/syncCoupons", method = RequestMethod.POST)
@Async("taskExecutor")
public ResponseEntity<?> syncShopifyCouponsV4(@RequestBody List<Long> ids) {
logger.info("Start :: syncShopifyPriceRulesListToMongoV3 for ids size = " + ids.size());
try {
if (!ids.isEmpty()) {
List<CompletableFuture<Void>> futures = ids.stream()
.map(id -> CompletableFuture.runAsync(() -> {
logger.info("id inside CompletableFuture: " + id);
ShopifyBulkSyncCoupons task = shopifyBulkSyncCoupons;
task.setCouponId(id);
task.run(); // Manually invoke the run method
}, taskExecutor)) // Use your configured taskExecutor here
.collect(Collectors.toList());
CompletableFuture<Void>[] futureArray = futures.toArray(new CompletableFuture[0]);
CompletableFuture<Void> allOf = CompletableFuture.allOf(futureArray);
allOf.join(); // Wait for all tasks to complete
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("End :: syncShopifyPriceRulesListToMongoV3");
return new ResponseEntity<>(HttpStatus.OK);
}
我尝试删除行 executor.setQueueCapacity(10);但仍然遇到同样的错误。有人可以指出这里出了什么问题吗?
注意: 这个答案是一种正在进行中的工作。
代码问题:
@Async
在控制器方法上,应该在服务方法上(包括控制器中的代码)或正确利用异步servlet API。ShopifyBulkSyncCoupons
来重新使用 id
属性是很麻烦的事情。@Async
服务方法@Async
应该位于服务方法上,并且控制器中的代码也应该移动到该方法。这样控制器就会快速返回。或者通过从控制器方法返回 [Completable]Future
或 Callable
来正确使用异步 Servlet API。
lambda 内的代码表示在多线程环境中重用 statefuo 单例。不要做这样的事情,因为那会带来麻烦。
ShopifyBulkSyncCoupons task = shopifyBulkSyncCoupons;
task.setCouponId(id);
task.run(); // Manually invoke the run method
服务
public MyProcessingService {
private final AsyncTaskExecutor taskExecutor;
public MyProcessingService(AsyncTaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
@Async("taskExecutor")
public CompletableFuture<Void> processCoupons(List<Long> ids) {
logger.info("Start :: syncShopifyPriceRulesListToMongoV3 for ids size = " + ids.size());
CompletableFuture<Void>[] futures =
ids.stream()
.map( (id) -> new ShopifyBulkSyncCoupons(id))
.map( (task) -> CompletableFuture.runAsync(task, taskExecutor)
.toArray(CompletableFuture[]::new);
return CompletableFuture.allOf(futures);
}
}
@RequestMapping(value = "/coupons/bulk/syncCoupons", method = RequestMethod.POST)
public CompletableFuture<Void> syncShopifyCouponsV4(@RequestBody List<Long> ids) {
return service.processCoupons(ids);