春季启动,多网,多模块,@Transactional,parallelStream

问题描述 投票:1回答:1

我正在尝试将大约50k记录插入db。我们使用了AbstractRoutingDataSource,它使用TenantContext来解析数据源,TenantContext是一个实用程序类,具有私有的静态最终ThreadLocal CURRENT_TENANT = new ThreadLocal <>();

[当我使用并行流时,或者如果我尝试使用方法@Async时,出现以下错误

代码:

             .parallelStream()
             .forEach(row -> {
                 TenantContext.setCurrentTenant(centerCd);
                 someDao.insert(row);
             });

错误:


org.springframework.transaction.CannotCreateTransactionException: Could not open JDBC Connection for transaction; nested exception is java.lang.IllegalStateException: Cannot determine target DataSource for lookup key [null]
    at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:305)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:474)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Cannot determine target DataSource for lookup key [null]
    at org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource.determineTargetDataSource(AbstractRoutingDataSource.java:207)
    at org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource.getConnection(AbstractRoutingDataSource.java:169)
    at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:262)
    ... 10 common frames omitted

spring multithreading spring-boot transactions multi-module
1个回答
0
投票

它的工作原理与您描述的完全相同:您的TenantContext正是ThreadLocal,并且存在于一个线程中,该线程由parallelStream()Async方法启动。 (实际上,AsyncforEach方法内部的调用是来自runRunnable

试图在线程开始时插入/解析数据源:因为必须在创建线程时(在Runnable进入run方法之前启动事务)。而且,此时您尚未指定租户,则稍后将在TenantContext.setCurrentTenant(centerCd)方法实现中执行调用run

我建议将这种结构应用于您的代码:

class TenantAwareThread extends Thread {

    public TenantAwareThread(Runnable target, TenantData tenantData) {
        super(target);
        TenantContext.setCurrentTenant(tenantData);
    }
}

@Autowired
TaskExecutor executor;

void startTask(TenantData tenantData, RowData row) {
    executor.execute(
        new TenantAwareThread(() -> {
            someDao.insert(row);
        },
        tenantData));
}

您创建了一个新的线程类型,它从一开始就知道租户数据。并且只需将您的执行包装到此类线程中即可。

© www.soinside.com 2019 - 2024. All rights reserved.