我正在开发一个 Spring Batch 应用程序,其中在上下文启动期间配置了一个批处理数据源(作为标准),并且我希望有一个必须基于名为“国家/地区”的作业参数进行初始化的辅助数据源。
我为每个国家/地区配置了辅助数据源属性:
#LTU
country.instances.ltu.url=...
country.instances.ltu.username=...
country.instances.ltu.password=...
country.instances.ltu.driver-class-name=...
#ITA
country.instances.ita.url=...
country.instances.ita.username=...
country.instances.ita.password=...
country.instances.ita.driver-class-name=...
#FRA
country.instances.fra.url=...
country.instances.fra.username=...
country.instances.fra.password=...
country.instances.fra.driver-class-name=...
... and so on
我有一个用于数据源的类@Configuration:
@Configuration
public class DataSourceConfig {
@Autowired
private CountryProperties countryProperties; // @ConfigurationProperties(prefix = "country")
// Batch Datasource configurations ...
@Bean
@JobScope
public DataSource secondaryDataSource(
@Value("#{jobParameters['country']}") String instanceName
) {
InstanceProperties instanceProperties = countryProperties.getInstances()
.get(instanceName);
Assert.notNull(instanceProperties, "instance configuration is required");
DataSourceBuilder<?> dataSourceBuilder = DataSourceBuilder.create()
.driverClassName(instanceProperties.getDriverClassName())
.url(instanceProperties.getUrl())
.username(instanceProperties.getUsername())
.password(instanceProperties.getPassword());
return dataSourceBuilder.build();
}
@Bean
public DataSourceTransactionManager secondaryTransactionManager() {
return new DataSourceTransactionManager(secondaryDataSource(null));
}
}
我还有另一个用于自动连接数据源/事务管理器的作业@Configuration:
@Configuration
public class TestJob {
@Autowired
protected JobRepository jobRepository;
@Autowired
protected JobLauncher jobLauncher;
@Autowired
protected DataSource secondaryDataSource;
@Autowired
@Qualifier("secondaryTransactionManager")
protected PlatformTransactionManager secondaryTransactionManager;
@Bean
public Job job() {
return new JobBuilder("testJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(step())
.build();
}
@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setThreadNamePrefix("thread-n");
return simpleAsyncTaskExecutor;
}
@Bean
public ColumnRangePartitioner columnRangePartitioner() {
ColumnRangePartitioner columnRangePartitioner = new ColumnRangePartitioner();
columnRangePartitioner.setDataSource(secondaryDataSource);
columnRangePartitioner.setTable("GLB_STORAGE_DOCUMENTS");
return columnRangePartitioner;
}
@Bean
public Step step() {
return new StepBuilder("saveDataToCSVStep", jobRepository)
.partitioner(slaveStep().getName(), columnRangePartitioner())
.step(slaveStep())
.gridSize(2)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Step slaveStep() {
return new StepBuilder("step", jobRepository)
.<StorageDocument, StorageDocument>chunk(100, secondaryTransactionManager)
.reader(itemReader())
.writer(itemWriter())
.build();
}
@Bean
public JdbcCursorItemReader<StorageDocument> itemReader() {
return new JdbcCursorItemReaderBuilder<StorageDocument>()
.dataSource(secondaryDataSource)
.name("reader")
.sql("select STORAGE_DOCUMENT_ID from GLB_STORAGE_DOCUMENTS")
.rowMapper(new StorageDocumentRowMapper())
.build();
}
@Bean
public FlatFileItemWriter<StorageDocument> itemWriter() {
BeanWrapperFieldExtractor<StorageDocument> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(
new String[]{"storageDocumentId"});
DelimitedLineAggregator<StorageDocument> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
lineAggregator.setFieldExtractor(fieldExtractor);
return new FlatFileItemWriterBuilder<StorageDocument>()
.name("flatFileItemWriter")
.resource(new FileSystemResource("target/test.csv"))
.headerCallback(writer -> writer.write("STORAGEDOCUMENTID"))
.lineAggregator(lineAggregator)
.build();
}
}
当我运行批处理时,出现此错误:
2024-04-15T12:47:28,823 ERROR [thread-n1] o.s.b.c.s.AbstractStep:
Encountered an error executing step step in job testJob
org.springframework.batch.item.ItemStreamException: Failed to
initialize the reader
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:155)
at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:124)
at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:292)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:226)
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler.lambda$createTask$0(TaskExecutorPartitionHandler.java:132)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.lang.Thread.run(Thread.java:1583) Caused by: org.springframework.beans.factory.support.ScopeNotActiveException:
Error creating bean with name 'scopedTarget.secondaryDataSource':
Scope 'job' is not active for the current thread; consider defining a
scoped proxy for this bean if you intend to refer to it from a singleton
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:373)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:200)
at jdk.proxy2/jdk.proxy2.$Proxy61.getConnection(Unknown Source)
at org.springframework.batch.item.database.AbstractCursorItemReader.initializeConnection(AbstractCursorItemReader.java:450)
at org.springframework.batch.item.database.AbstractCursorItemReader.doOpen(AbstractCursorItemReader.java:430)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:152)
... 7 more Caused by: java.lang.IllegalStateException: No context holder available for job scope
at org.springframework.batch.core.scope.JobScope.getContext(JobScope.java:157)
at org.springframework.batch.core.scope.JobScope.get(JobScope.java:92)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:361)
... 14 more
从我周围发现的情况来看,我似乎遇到了这个“问题”:
在以下情况中使用作业范围的 Bean 有一些实际限制: 多线程或分区步骤。 Spring Batch 不控制 在这些用例中产生线程,因此无法设置它们 正确使用此类豆子。因此,不建议使用 多线程或分区步骤中的作业范围 bean。
由于不建议在多线程步骤中使用 JobScope,我该如何摆脱这种情况?
如何在 Spring Batch 中在运行时/延迟使用 JobParameters 配置数据源?
附注我愿意接受更改批次“设计”以达到所需要求的建议。
我遇到了类似的问题,必须在运行时动态设置数据源,这对我有帮助。