我使用spring boot创建了一个简单的spring批处理作业,该作业从我们的数据库读取并写入主题。我还有一个钩子,我可以注释掉topicWriter并在开发过程中写入csv文件。两者都在通过评论一个并运行另一个作者来工作。 (topicWriter或writer)。现在,企业希望能够运行adhoc,主题或作者。所以我选择传入包含topic或csv的输出参数。阅读后看起来我可以使用决策者,但这可能是错误的。现在看来,下面的代码抱怨重复步骤,并在我尝试运行时循环。我无法弄清楚如何在没有启动步骤的情况下运行,所以我创建了一个无所事事的tasklet,因为该作业需要一个开始步骤决定者。所以我想我搞砸了这一切。任何解决方案或方向的想法?
@Bean
public Job job(@Qualifier("step") Step step) {
return jobBuilderFactory.get(BatchConstants.JOB_NAME).listener(jobListener())
.start(step).next(decider()).on("COMPLETED").to(step1(null,null)).from(decider()).on("FAILED").to(step2(null,null)).end().build();
}
@Bean
protected Step step() {
return stepBuilderFactory.get("step")
.tasklet(new Tasklet() {
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
return RepeatStatus.FINISHED;
}
})
.build();
}
@Bean
protected Step step1(ItemReader<someDto> reader,
ItemWriter<someDto> topicWriter) {
return stepBuilderFactory.get(BatchConstants.STEP_NAME)
.<someDto, someDto> chunk(BatchConstants.CHUNKSIZE)
.reader(reader)
.writer(topicWriter) // write to kafka topic.
.build();
}
@Bean
protected Step step2(ItemReader<someDto> reader,
ItemWriter<someDto> writer) {
return stepBuilderFactory.get(BatchConstants.STEP_NAME)
.<someDto, someDto> chunk(BatchConstants.CHUNKSIZE)
.reader(reader)
.writer(writer) // writes to csv
.build();
}
在单步中,您可以使用@StepScpoe定义它。根据作业参数,您可以选择编写器。
@Bean
@StepScope
protected Step step2(ItemReader<someDto> reader,
ItemWriter<someDto> writer ,ItemWriter<someDto> topicWriter,"#{jobParameters['writerType']}") final String type ) {
ItemWriter<someDto> myWriter;
if(type.equals("topic"))
{
myWriter=topicWriter;
}
else
{
myWriter=writer;
}
return stepBuilderFactory.get(BatchConstants.STEP_NAME)
.<someDto, someDto> chunk(BatchConstants.CHUNKSIZE)
.reader(reader)
.writer(myWriter)
.build();
}