目前我正在使用基于 Mongo DB 的作业存储库将 Spring Boot 2.7.x (Spring Batch 4.3.x) 迁移到 Spring Boot 3.0.x (Spring Batch 5.0.x)
@Configuration
public class MainBatchConfigurer implements BatchConfigurer {
@Autowired
private ExecutionContextDao mongoExecutionContextDao;
@Autowired
private JobExecutionDao mongoJobExecutionDao;
@Autowired
private JobInstanceDao mongoJobInstanceDao;
@Autowired
private StepExecutionDao mongoStepExecutionDao;
@Override
public JobRepository getJobRepository() {
return new SimpleJobRepository(mongoJobInstanceDao, mongoJobExecutionDao, mongoStepExecutionDao, mongoExecutionContextDao);
}
@Override
public PlatformTransactionManager getTransactionManager() {
return new ResourcelessTransactionManager();
}
@Override
public SimpleJobLauncher getJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Override
public JobExplorer getJobExplorer() {
return new SimpleJobExplorer(mongoJobInstanceDao, mongoJobExecutionDao, mongoStepExecutionDao, mongoExecutionContextDao);
}
}
由于“BatchConfigurer”接口已被删除,我计划更改“MainBatchConfigurer”,如下所示
@Configuration
public class MainBatchConfigurer {
@Autowired
private ExecutionContextDao mongoExecutionContextDao;
@Autowired
private JobExecutionDao mongoJobExecutionDao;
@Autowired
private JobInstanceDao mongoJobInstanceDao;
@Autowired
private StepExecutionDao mongoStepExecutionDao;
@Bean
public JobRepository jobRepository() {
return new SimpleJobRepository(mongoJobInstanceDao, mongoJobExecutionDao, mongoStepExecutionDao, mongoExecutionContextDao);
}
@Bean
public PlatformTransactionManager getTransactionManager() {
return new ResourcelessTransactionManager();
}
public TaskExecutorJobLauncher getJobLauncher() throws Exception {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Bean
public JobExplorer getJobExplorer() {
return new SimpleJobExplorer(mongoJobInstanceDao, mongoJobExecutionDao, mongoStepExecutionDao, mongoExecutionContextDao);
}
}
之前我一直在调用该作业
SimpleJobLauncher jobLauncher = batchConfigurer.getJobLauncher();
JobExecution jobExecution = jobLauncher.run(xxx, jobParameters);
对于 Batch 5.0.x,我在创建步骤时需要传递 Jobrepository 和 TransationManager,在创建作业时还需要传递 JobRepository。我有很多工作和步骤,我是否需要将这些全部传递出去。有没有办法避免不设置 Jobrepository 和 TransationManager,因为我将其设置为“TaskExecutorJobLauncher”。
我在单独的类中定义了多个作业,每个作业类都有自己的步骤和作业定义,如下所示。这是我定义的 Master、workersteps 和 jobs 示例
@Bean
public Step departmentMigrationStep() throws UnexpectedInputException, ParseException {
return stepBuilderFactory.get("departmentMigratioStep")
.<departments, departments>chunk(10000)
.reader(departmentPeekingreader())
.processor(departmentprocessor())
.writer(departmentwriter)
.listener(genericItemReadListener)
.listener(genericSkipListener)
.listener(genericStepExecutionListener)
.listener(genericChunkListener)
.build();
}
@Bean
public Step departmentValidationStep() throws UnexpectedInputException, ParseException {
return stepBuilderFactory.get("departmentValidationStep")
.<xxx, yyy>chunk(10000)
.reader(departmentPeekingreader)
.processor(departmentValidationProcessor())
.writer(departmentValidationWriter)
.listener(genericChunkListener)
.build();
}
@Bean
public Step deparmentMasterStep() throws UnexpectedInputException, ParseException {
return stepBuilderFactory.get("deparmentMasterStep")
.partitioner("workerStep",departmentPartitioner())
.step(departmentMigrationStep())
.taskExecutor(departmentExecutor())
.gridSize(20)
.build();
}
@Bean
public Step departmentValidationMasterStep() throws UnexpectedInputException, ParseException{
return stepBuilderFactory.get("departmentValidationMasterStep")
.partitioner("workerStep",departmentPartitioner())
.step(departmentValidationStep())
.taskExecutor(departmentExecutor())
.gridSize(20)
.build();
}
@Primary
@Bean
public Job processDepartmentMigration() {
return jobBuilderFactory.get("processDepartmentMigration")
.incrementer(new RunIdIncrementer()).listener(departmentJobListener())
.start(deparmentMasterStep())
.build();
}
@Bean
public Job processdepartmentValidation() {
return jobBuilderFactory.get("processdepartmentValidation")
.incrementer(new RunIdIncrementer()).listener(departmentJobListener())
.start(departmentValidationMasterStep())
.build();
}
我有多个工作类别,每个工作类别都有自己的主控、工作步骤和其中定义的工作
您可以创建一个基类,在其中定义一个方法,该方法返回设置作业存储库的步骤构建器:
class BaseConfiguration {
// define common configuration
protected StepBuilder getStep(String name) {
return new StepBuilder(name, jobRepository());
}
}
然后为每个作业扩展此类并调用创建步骤的方法,如下所示:
class Job1Config extends BaseConfiguration {
//..
@Bean
public Step departmentMigrationStep() {
return super.getStep("departmentMigratioStep")
// ...
.build();
}
}
请注意,事务管理器并不总是需要的,就像在分区步骤中一样。因此,您可以调整基类中的方法或创建不同的方法来返回
TaskletStepBuilder
或 PartitionStepBuilder
等。例如:
protected <I, O> SimpleStepBuilder<I, O> getChunkedStep(String name, int chunkSize) {
return new StepBuilder(name, jobRepository())
.chunk(chunkSize, transactionManager());
}
这可以让您获得与已弃用的
JobBuilderFactory
和 StepBuilderFactory
相同的结果。