我正在尝试将大约50k记录插入db。我们使用了AbstractRoutingDataSource,它使用TenantContext来解析数据源,TenantContext是一个实用程序类,具有私有的静态最终ThreadLocal CURRENT_TENANT = new ThreadLocal <>();
[当我使用并行流时,或者如果我尝试使用方法@Async时,出现以下错误
代码:
.parallelStream()
.forEach(row -> {
TenantContext.setCurrentTenant(centerCd);
someDao.insert(row);
});
错误:
org.springframework.transaction.CannotCreateTransactionException: Could not open JDBC Connection for transaction; nested exception is java.lang.IllegalStateException: Cannot determine target DataSource for lookup key [null]
at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:305)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378)
at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:474)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Cannot determine target DataSource for lookup key [null]
at org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource.determineTargetDataSource(AbstractRoutingDataSource.java:207)
at org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource.getConnection(AbstractRoutingDataSource.java:169)
at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:262)
... 10 common frames omitted
它的工作原理与您描述的完全相同:您的TenantContext正是ThreadLocal,并且存在于一个线程中,该线程由parallelStream()
或Async
方法启动。 (实际上,Async
或forEach
方法内部的调用是来自run
的Runnable
)
试图在线程开始时插入/解析数据源:因为必须在创建线程时(在Runnable
进入run
方法之前启动事务)。而且,此时您尚未指定租户,则稍后将在TenantContext.setCurrentTenant(centerCd)
方法实现中执行调用run
。
我建议将这种结构应用于您的代码:
class TenantAwareThread extends Thread {
public TenantAwareThread(Runnable target, TenantData tenantData) {
super(target);
TenantContext.setCurrentTenant(tenantData);
}
}
@Autowired
TaskExecutor executor;
void startTask(TenantData tenantData, RowData row) {
executor.execute(
new TenantAwareThread(() -> {
someDao.insert(row);
},
tenantData));
}
您创建了一个新的线程类型,它从一开始就知道租户数据。并且只需将您的执行包装到此类线程中即可。