运行时/延迟基于JobParameter初始化辅助数据源

问题描述 投票:0回答:1

我正在开发一个 Spring Batch 应用程序,其中在上下文启动期间配置了一个批处理数据源(作为标准),并且我希望有一个必须基于名为“国家/地区”的作业参数进行初始化的辅助数据源。

我为每个国家/地区配置了辅助数据源属性:

#LTU
country.instances.ltu.url=...
country.instances.ltu.username=...
country.instances.ltu.password=...
country.instances.ltu.driver-class-name=...
#ITA
country.instances.ita.url=...
country.instances.ita.username=...
country.instances.ita.password=...
country.instances.ita.driver-class-name=...
#FRA
country.instances.fra.url=...
country.instances.fra.username=...
country.instances.fra.password=...
country.instances.fra.driver-class-name=...
... and so on

我有一个用于数据源的类@Configuration:

@Configuration
public class DataSourceConfig {

  @Autowired
  private CountryProperties countryProperties; // @ConfigurationProperties(prefix = "country")

  // Batch Datasource configurations ...

  @Bean
  @JobScope
  public DataSource secondaryDataSource(
      @Value("#{jobParameters['country']}") String instanceName
  ) {
    InstanceProperties instanceProperties = countryProperties.getInstances()
        .get(instanceName);
    Assert.notNull(instanceProperties, "instance configuration is required");
    DataSourceBuilder<?> dataSourceBuilder = DataSourceBuilder.create()
        .driverClassName(instanceProperties.getDriverClassName())
        .url(instanceProperties.getUrl())
        .username(instanceProperties.getUsername())
        .password(instanceProperties.getPassword());

    return dataSourceBuilder.build();
  }

  @Bean
  public DataSourceTransactionManager secondaryTransactionManager() {
    return new DataSourceTransactionManager(secondaryDataSource(null));
  }
}

我还有另一个用于自动连接数据源/事务管理器的作业@Configuration:

@Configuration
public class TestJob {

  @Autowired
  protected JobRepository jobRepository;

  @Autowired
  protected JobLauncher jobLauncher;

  @Autowired
  protected DataSource secondaryDataSource;

  @Autowired
  @Qualifier("secondaryTransactionManager")
  protected PlatformTransactionManager secondaryTransactionManager;

  @Bean
  public Job job() {
    return new JobBuilder("testJob", jobRepository)
        .incrementer(new RunIdIncrementer())
        .start(step())
        .build();
  }

  @Bean
  public TaskExecutor taskExecutor() {
    SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
    simpleAsyncTaskExecutor.setThreadNamePrefix("thread-n");
    return simpleAsyncTaskExecutor;
  }

  @Bean
  public ColumnRangePartitioner columnRangePartitioner() {
    ColumnRangePartitioner columnRangePartitioner = new ColumnRangePartitioner();
    columnRangePartitioner.setDataSource(secondaryDataSource);
    columnRangePartitioner.setTable("GLB_STORAGE_DOCUMENTS");
    return columnRangePartitioner;
  }

  @Bean
  public Step step() {
    return new StepBuilder("saveDataToCSVStep", jobRepository)
        .partitioner(slaveStep().getName(), columnRangePartitioner())
        .step(slaveStep())
        .gridSize(2)
        .taskExecutor(taskExecutor())
        .build();
  }

  @Bean
  public Step slaveStep() {
    return new StepBuilder("step", jobRepository)
        .<StorageDocument, StorageDocument>chunk(100, secondaryTransactionManager)
        .reader(itemReader())
        .writer(itemWriter())
        .build();
  }

  @Bean
  public JdbcCursorItemReader<StorageDocument> itemReader() {
    return new JdbcCursorItemReaderBuilder<StorageDocument>()
        .dataSource(secondaryDataSource)
        .name("reader")
        .sql("select STORAGE_DOCUMENT_ID from GLB_STORAGE_DOCUMENTS")
        .rowMapper(new StorageDocumentRowMapper())
        .build();

  }

  @Bean
  public FlatFileItemWriter<StorageDocument> itemWriter() {
    BeanWrapperFieldExtractor<StorageDocument> fieldExtractor = new BeanWrapperFieldExtractor<>();
    fieldExtractor.setNames(
        new String[]{"storageDocumentId"});
    DelimitedLineAggregator<StorageDocument> lineAggregator = new DelimitedLineAggregator<>();
    lineAggregator.setDelimiter(",");
    lineAggregator.setFieldExtractor(fieldExtractor);
    return new FlatFileItemWriterBuilder<StorageDocument>()
        .name("flatFileItemWriter")
        .resource(new FileSystemResource("target/test.csv"))
        .headerCallback(writer -> writer.write("STORAGEDOCUMENTID"))
        .lineAggregator(lineAggregator)
        .build();
  }
}

当我运行批处理时,出现此错误:

2024-04-15T12:47:28,823 ERROR [thread-n1] o.s.b.c.s.AbstractStep:
Encountered an error executing step step in job testJob
org.springframework.batch.item.ItemStreamException: Failed to
initialize the reader
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:155)
    at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:124)
    at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:292)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:226)
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler.lambda$createTask$0(TaskExecutorPartitionHandler.java:132)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.lang.Thread.run(Thread.java:1583) Caused by: org.springframework.beans.factory.support.ScopeNotActiveException:
Error creating bean with name 'scopedTarget.secondaryDataSource':
Scope 'job' is not active for the current thread; consider defining a
scoped proxy for this bean if you intend to refer to it from a singleton
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:373)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
    at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:200)
    at jdk.proxy2/jdk.proxy2.$Proxy61.getConnection(Unknown Source)
    at org.springframework.batch.item.database.AbstractCursorItemReader.initializeConnection(AbstractCursorItemReader.java:450)
    at org.springframework.batch.item.database.AbstractCursorItemReader.doOpen(AbstractCursorItemReader.java:430)
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:152)
    ... 7 more Caused by: java.lang.IllegalStateException: No context holder available for job scope
    at org.springframework.batch.core.scope.JobScope.getContext(JobScope.java:157)
    at org.springframework.batch.core.scope.JobScope.get(JobScope.java:92)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:361)
    ... 14 more

从我周围发现的情况来看,我似乎遇到了这个“问题”:

在以下情况中使用作业范围的 Bean 有一些实际限制: 多线程或分区步骤。 Spring Batch 不控制 在这些用例中产生线程,因此无法设置它们 正确使用此类豆子。因此,不建议使用 多线程或分区步骤中的作业范围 bean。

由于不建议在多线程步骤中使用 JobScope,我该如何摆脱这种情况?

如何在 Spring Batch 中在运行时/延迟使用 JobParameters 配置数据源?

附注我愿意接受更改批次“设计”以达到所需要求的建议。

java spring spring-boot spring-batch
1个回答
0
投票

我遇到了类似的问题,必须在运行时动态设置数据源,这对我有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.