我有一个类 DatabaseContextHolder,它使用 ThreadLocal 根据用户的 API 密钥管理数据源。
public final class DatabaseContextHolder {
private static final ThreadLocal<Stack<DataSourceType>> ctx = new ThreadLocal<>();
public static void setCtx(DataSourceType dataSourceType) {
getCtx().push(dataSourceType);
}
public static void restoreCtx() {
final Stack<DataSourceType> ctx = getCtx();
if (!ctx.isEmpty()) {
ctx.pop();
}
}
public static Optional<DataSourceType> peekDataSource() {
final Stack<DataSourceType> ctx = getCtx();
if (ctx.isEmpty()) {
return Optional.empty();
}
return Optional.of(ctx.peek());
}
private static Stack<DataSourceType> getCtx() {
if (ctx.get() == null) {
ctx.set(new Stack<>());
}
return ctx.get();
}
}
无需多线程即可正常工作。但是,当我使用 CompletableFuture 时,异步任务中的 ctx 字段始终为空。 CompletableFuture 的方法示例:
private final ExecutorService executorService = Executors.newCachedThreadPool();
public ResponseEntity<Map<String, List<ReportGroupScheduleExecutionBean>>> getScheduleExecutionByGroup(@RequestParam(value = "accountId") int accountId,
@RequestParam("dateFrom") @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime dateFrom,
@RequestParam("dateTo") @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime dateTo,
@RequestParam(name = "groupIds") Set<Integer> groupIds,
@AuthenticationPrincipal UserBean userBean) throws NoDataException {
ZoneId userTimeZone = ZoneId.of(userBean.getUserTimeZone());
DateTimeFormatter zoneTimeFormatter = DateTimeFormatter.ofPattern(ModelsConstants.PATTERN_TIME_ONLY_MINUTES).withZone(userTimeZone);
AccountEntity account = accountService.findById(accountId).orElseThrow(() -> new NoDataException("Account not exist"));
List<CompletableFuture<List<ReportGroupScheduleExecutionBean>>> futures = new ArrayList<>();
groupIds.forEach(id -> {
CompletableFuture<List<ReportGroupScheduleExecutionBean>> future = CompletableFuture.supplyAsync(
() -> {
try {
return reportService.getRouteScheduleExecutionReportByGroup(new ReportInfoBean(
dateFrom, dateTo, account, userBean.getUserTimeZone(), zoneTimeFormatter), id);
} catch (NoDataException e) {
throw new RuntimeException(e);
}
}, executorService);
futures.add(future);
});
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<List<ReportGroupScheduleExecutionBean>> futuresAll = allFutures.thenApply
(v -> futures.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.collect(Collectors.toList())
);
try {
return new ResponseEntity<>(futuresAll.get().stream().collect(Collectors.groupingBy(ReportGroupScheduleExecutionBean::getAccountName)), HttpStatus.OK);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
我尝试过的:
如何确保 ThreadLocal 上下文正确传播到 CompletableFuture 生成的线程?
如何确保 ThreadLocal 上下文正确传播到 CompletableFuture 生成的线程?
您对
ThreadLocal
的看法似乎完全错误。重点是给定的 ThreadLocal
对象为每个线程保存不同的、独立的值。无论一个线程 set()
的值是什么,该线程 都可以 get()
从它返回。目标是避免在线程之间共享数据。
因此...
当我使用 CompletableFuture 时,异步任务中的 ctx 字段始终为空
是的!异步任务在与您在
ThreadLocal
中设置值的线程不同的线程中运行,因此它们不应该在 ThreadLocal
中看到该值。也没有它的副本或任何类似的东西。他们要么得到它的初始值,要么得到他们最后自己设置的任何值。
您可以通过使
ThreadLocal
实例向所有线程提供相同的对象作为其初始值来部分打破这一点,但在大多数情况下这是不合适的。如果您想要一个共享对象,则可以省略 ThreadLocal
并直接设置这样的对象。在这种情况下,ThreadLocal
不会为您做任何有用的事情。特别是,它不为以这种方式暴露给多个线程的共享对象提供任何类型的同步。