具有多租户的 Spring Batch

问题描述 投票:0回答:1

好吧,我确信其他人也遇到过这个问题并解决了它,但它似乎超出了我对 Spring 的理解。 我有一个 Spring Boot (2.2.4) 应用程序,我正在使用 Spring Batch 从文本文件中读取一堆记录并将它们导入到 MySQL 数据库中。这工作得很好,但现在我需要使应用程序成为多租户。我能够通过在 (X-TenantID) 中传递标头来使 Spring Boot 应用程序成为多租户,但现在 Spring Batch 不再起作用。为什么? 这是我遇到的错误。

org.springframework.dao.EmptyResultDataAccessException: Incorrect result size: expected 1, actual 0
完全例外:


org.springframework.dao.EmptyResultDataAccessException: Incorrect result size: expected 1, actual 0
    at org.springframework.dao.support.DataAccessUtils.nullableSingleResult(DataAccessUtils.java:97) ~[spring-tx-5.2.3.RELEASE.jar:5.2.3.RELEASE]
DataAccessUtils.java:97
    at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:784) ~[spring-jdbc-5.2.3.RELEASE.jar:5.2.3.RELEASE]
JdbcTemplate.java:784
    at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:809) ~[spring-jdbc-5.2.3.RELEASE.jar:5.2.3.RELEASE]
JdbcTemplate.java:809
    at org.springframework.batch.core.repository.dao.JdbcJobExecutionDao.synchronizeStatus(JdbcJobExecutionDao.java:308) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
JdbcJobExecutionDao.java:308
    at org.springframework.batch.core.repository.support.SimpleJobRepository.update(SimpleJobRepository.java:166) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
SimpleJobRepository.java:166
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_212]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_212]
NativeMethodAccessorImpl.java:62
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_212]
DelegatingMethodAccessorImpl.java:43
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_212]
Method.java:498
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
AopUtils.java:344
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
ReflectiveMethodInvocation.java:198
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
ReflectiveMethodInvocation.java:163
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:366) ~[spring-tx-5.2.3.RELEASE.jar:5.2.3.RELEASE]
TransactionAspectSupport.java:366
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:99) ~[spring-tx-5.2.3.RELEASE.jar:5.2.3.RELEASE]
TransactionInterceptor.java:99
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
ReflectiveMethodInvocation.java:186
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
JdkDynamicAopProxy.java:212
    at com.sun.proxy.$Proxy182.update(Unknown Source) ~[na:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_212]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_212]
NativeMethodAccessorImpl.java:62
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_212]
DelegatingMethodAccessorImpl.java:43
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_212]
Method.java:498
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
AopUtils.java:344
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
ReflectiveMethodInvocation.java:198
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
ReflectiveMethodInvocation.java:163
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
SimpleBatchConfiguration.java:127
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
ReflectiveMethodInvocation.java:186
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
JdkDynamicAopProxy.java:212
    at com.sun.proxy.$Proxy215.update(Unknown Source) ~[na:na]
    at org.springframework.batch.core.job.AbstractJob.updateStatus(AbstractJob.java:440) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
AbstractJob.java:440
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:314) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
SimpleJobLauncher.java:147
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
Thread.java:748

编辑: 我正在使用 HandlerInterceptorAdapter 设置租户 id...

public class TenantInterceptor extends HandlerInterceptorAdapter {

  private String defaultHeader = "X-TENANT-ID";
  private String defaultTenantId = "default";

  @Override
  public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
    log.debug("TenantInterceptor::preHandle");
    log.debug("Default Header: " + defaultHeader);
    log.debug("Default Tenant ID: " + defaultTenantId);
    try {
      String tenantId = defaultTenantId;
      if (defaultHeader.equalsIgnoreCase("Authorization")) {
        log.debug("Using the 'Authorization' header so we need to get the value from the JWT");
        String authToken = request.getHeader(defaultHeader);
        try {
          tenantId = JwtUtil.getClaim(JwtUtil.getTokenFromBearer(authToken).get(), "tenantId");
        } catch (Exception ex) {
          ex.printStackTrace();
        }
      } else if (defaultHeader.equalsIgnoreCase("X-TENANT-ID")) {
        log.debug("Using the 'X-TENANT-ID' header so just get the value from it.");
        tenantId = request.getHeader(defaultHeader);
        log.debug("TenantID: " + tenantId);
        if (tenantId == null || tenantId.isEmpty()) {
          tenantId = defaultTenantId;
        }
      }
      // TODO: Security checks
      // We need to ensure that the user making the request has the permissions
      // to make a request for the this tenant (Client/Customer)!
      TenantContext.setCurrentTenant(tenantId);
    } catch (Exception ex) {
      return false;
    }

    return true;
  }

  @Override
  public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
      ModelAndView modelAndView) throws Exception {
    TenantContext.clear();
  }
}

租户上下文

public class TenantContext {

  private static ThreadLocal<String> currentTenant = new ThreadLocal<>();

  public static String getCurrentTenant() {
    return currentTenant.get();
  }

  public static void setCurrentTenant(String tenant) {
    log.debug("Setting tenant to " + tenant);
    currentTenant.set(tenant);
  }

  public static void clear() {
    currentTenant.set(null);
  }

}

还有我的 BatchConfig

public class BatchConfig {
  @Autowired
  JobRepository jobRepository;

  @Autowired
  EntityManagerFactory emf;

  @Autowired
  WheelItemDao wheelItemDao;

  @Autowired
  DuplicateJurorDao dupJurorDao;

  @Autowired
  JurorDao jurorDao;

  @Autowired
  EventDao eventDao;

  // @Autowired
  // DataSource dataSource;

  @Bean
  public BatchConfigurer batchConfigurer(EntityManagerFactory emf) {
    return new DefaultBatchConfigurer() {
      @Override
      public PlatformTransactionManager getTransactionManager() {

        return new JpaTransactionManager(emf);
      }
    };
  }

  @Bean
  public Job wheelImportJob(JobBuilderFactory jobs, StepBuilderFactory steps,
      ItemProcessor<WheelItem, Juror> jurorProcessor, ItemProcessor<WheelItem, WheelItem> wheelProcessor,
      JobExecutionListener listener) {

    // @formatter:off
    Step wheelImport = steps.get("wheelImport")
      .<WheelItem, WheelItem>chunk(100)
      .reader(reader(null))
      .processor(wheelProcessor)
      .writer(wheelWriter())
      .build();

    Step jurorImport = steps.get("jurorImport")
      .<WheelItem, Juror>chunk(1)
      .reader(wheelItemReader())
      .processor(jurorProcessor)
      .writer(writer())
      .taskExecutor(taskExecutor())
      .throttleLimit(125)
      .build();

    return jobs.get("wheelImportJob")
      .incrementer(new RunIdIncrementer())
      .listener(listener)
      .start(wheelImport)
      .next(jurorImport)
      .build();
    // @formatter:on
  }

  @Bean
  public JpaItemWriter<Juror> writer() {
    JpaItemWriter<Juror> writer = new JpaItemWriter<>();
    writer.setEntityManagerFactory(emf);
    return writer;
  }

  @Bean
  public JpaItemWriter<WheelItem> wheelWriter() {
    JpaItemWriter<WheelItem> writer = new JpaItemWriter<>();
    writer.setEntityManagerFactory(emf);
    return writer;
  }

  @Bean
  public ItemReader<WheelItem> wheelItemReader() {
    JpaPagingItemReader<WheelItem> reader = new JpaPagingItemReader<>();
    reader.setEntityManagerFactory(emf);
    reader.setQueryString("select w from WheelItem w");
    reader.setPageSize(5);
    try {
      reader.afterPropertiesSet();
    } catch (Exception e) {
      e.printStackTrace();
    }

    return reader;
  }

  @Bean
  public JobExecutionListener listener() {

    return new JobExecutionListener() {

      @Override
      public void beforeJob(JobExecution jobExecution) {
        // @formatter:off
        eventDao.save(
          Event.builder()
            .active(true)
            .eventType(EventType.RECONTITUTE_WHEEL_STARTED.id)
            .note(EventType.RECONTITUTE_WHEEL_STARTED.label)
            .createdDt(LocalDateTime.now())
            .updatedDt(LocalDateTime.now())
            .build());
        // @formatter:on
        wheelItemDao.truncateTable();
        dupJurorDao.truncateTable();
        // De-activate all current Jurors.
        jurorDao.updateActive(false);
        JurorProcessor.counter = 0;
      }

      @Override
      public void afterJob(JobExecution jobExecution) {
        // @formatter:off
        eventDao.save(
          Event.builder()
            .active(true)
            .eventType(EventType.RECONTITUTE_WHEEL_FINISHED.getId())
            .note(EventType.RECONTITUTE_WHEEL_FINISHED.getLabel())
            .createdDt(LocalDateTime.now())
            .updatedDt(LocalDateTime.now())
            .build());
        // @formatter:on
      }
    };
  }

  @Bean
  public ChunkExecutionListener chunkListener() {
    return new ChunkExecutionListener();
  }

  @Bean
  @StepScope
  public synchronized FlatFileItemReader<WheelItem> reader(
      @Value("#{jobParameters[fullPathFileName]}") String pathToFile) {
    FlatFileItemReader<WheelItem> flatFileItemReader = new FlatFileItemReader<>();
    flatFileItemReader.setResource(new FileSystemResource(pathToFile));
    flatFileItemReader.setName("Wheel-Reader");
    flatFileItemReader.setLineMapper(lineMapper());
    return flatFileItemReader;
  }

  @Bean
  public LineMapper<WheelItem> lineMapper() {
    DefaultLineMapper<WheelItem> defaultLineMapper = new DefaultLineMapper<>();
    FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();
    tokenizer.setStrict(false);
    // @formatter:off
    tokenizer.setNames(
        ...
     );
    tokenizer.setColumns(
        ...
    );

    // @formatter:on
    defaultLineMapper.setLineTokenizer(tokenizer);

    BeanWrapperFieldSetMapper<WheelItem> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
    fieldSetMapper.setTargetType(WheelItem.class);

    defaultLineMapper.setFieldSetMapper(fieldSetMapper);
    return defaultLineMapper;
  }

  @Bean
  public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(100);
    taskExecutor.setMaxPoolSize(150);
    taskExecutor.initialize();

    return taskExecutor;
  }

  @Bean(name = "myJobLauncher")
  public JobLauncher simpleJobLauncher() throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
    jobLauncher.afterPropertiesSet();
    return jobLauncher;
  }
}

批处理作业正在通过 HTTP 调用启动到我的 RestController。

JobExecution jobExec = jobLauncher.run(
          job, 
          new JobParametersBuilder()
            .addLong("time", System.currentTimeMillis())
            .addString("fullPathFileName", UPLOADED_FOLDER + UPLOADED_FILE).toJobParameters());
java spring spring-boot multi-tenant
1个回答
0
投票

我用装饰器解决了类似的问题。请尝试一下:

static class ContextAwareTaskDecorator implements TaskDecorator {

    @Override
    @NonNull
    public Runnable decorate(@NonNull final Runnable runnable) {
        final var tenant = TenantContext.getCurrentTenant();
        return () -> {
            TenantContext.setCurrentTenant(tenant);
            runnable.run();
        };
    }
}

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(100);
    taskExecutor.setMaxPoolSize(150);
    taskExecutor.initialize();
    taskExecutor.setTaskDecorator(new ContextAwareTaskDecorator());
    return taskExecutor;
}
© www.soinside.com 2019 - 2024. All rights reserved.