如何在 Spring 批处理中编写租户感知的 RepositoryItemReader?

问题描述 投票:0回答:1

我有一个作业配置为基于作业参数运行,并与 spring web 和quartz 集成以根据需求和基于 cron 进行调用。我正在使用 RepositoryItemReader 来利用 spring 数据。这正在按预期运行。

现在我想在工作中引入多租户。我有 3 个租户,它们具有不同的数据库,例如租户 1、租户 2 和租户 3。基本上我想运行批处理作业,根据作业参数从数据库中选取数据。如果作业参数是tenant1,我想从tenant1数据库中选取数据。

我在这里找到了一篇关于如何在 Spring Boot 应用程序中引入多租户的文章。 https://www.baeldung.com/multitenancy-with-spring-data-jpa 问题是我无法理解在哪里可以将上下文注入线程,因为我正在使用 AsyncTaskScheduler 来启动作业,并且还有其他作业也在上下文中注册。

        JobParameters jobParameters = new JobParametersBuilder()
                .addString("tenantId",tenantId)
                .addString("jobName",jobName)
                .addLong("time", System.currentTimeMillis()).toJobParameters();
        Job job = jobRegistry.getJob(jobName);
        JobExecution jobExecution = asyncJobLauncher.run(job, jobParameters);

我的 itemReader bean 被描述为

    @StepScope
    @Bean
    public ItemReader<Person> itemReader() {
        return new RepositoryItemReaderBuilder<Person>()
                .name("ItemReader")
                .repository(personRepository)
                .arguments("personName").methodName("findByPersonNameEquals")
                .maxItemCount(30).pageSize(5)
                .sorts(Collections.singletonMap("createTs", Sort.Direction.ASC)).build();
    }
spring spring-data-jpa spring-data spring-batch multi-tenant
1个回答
0
投票

在使用RepositoryItemReader时,我在我们的项目中遇到了类似的问题。读者无法通过具有默认 TaskExecutorThreadLocal 访问租户。

为了使 RepositoryItemReader 访问 ThreadLocal,我创建了一个自定义装饰器 TaskExecutor 来提供当前的 main ThreadLocal。 参考:[https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/core/task/TaskDecorator.html][1]

例如:

@Bean(name = "taskExecutorDecorator")
public TaskExecutor taskExecutorDecorator(ThreadPoolTaskExecutorBuilder taskExecutorBuilder) {
  taskExecutorBuilder = taskExecutorBuilder.taskDecorator(
          new TaskDecorator() {
            @Override
            public Runnable decorate(Runnable runnable) {
              String currentTenant = Tenant.get();
              return () -> {
                try {
                  Tenant.set(currentTenant);
                  runnable.run();
                } finally {
                  Tenant.remove();
                }
              };
            }
          });
  return taskExecutorBuilder.build();
}

然后为Step提供taskExecutor。

@Bean
public Step step(
      @Qualifier("taskExecutorDecorator") TaskExecutor taskExecutorDecorator, PlatformTransactionManager transactionManager,
      ...) {
    return new StepBuilder("step", jobRepository)
        .<ClassA, ClassB>chunk(100, transactionManager)
        .taskExecutor(taskExecutorDecorator)
        ....
        .build();
}

希望可以帮到你。

© www.soinside.com 2019 - 2024. All rights reserved.