我想让春季批量工作变得临时化。也就是说,我有一个包含超过 100 000 条记录的 CSV 文件。我想每天分块阅读这个文件。例如,第一天的前 100 条记录,然后是第二天的下 100 条记录,依此类推。这是为了最大限度地减少消耗这些数据的系统的负载。
@Configuration
@AllArgsConstructor
public class SpringBatchConfig {
private final GrpDataWriter writer;
@Bean
public FlatFileItemReader<GrpData> reader() {
FlatFileItemReader<GrpData> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new FileSystemResource("src/main/resources/grpData.csv"));
itemReader.setName("dataReader");
itemReader.setLinesToSkip(1);
itemReader.setLineMapper(lineMapper());
return itemReader;
}
private LineMapper<GrpData> lineMapper() {
DefaultLineMapper<GrpData> lineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setDelimiter(",");
lineTokenizer.setStrict(false);
lineTokenizer.setNames("id", "first_name", "last_name", "email");
BeanWrapperFieldSetMapper<GrpData> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setTargetType(GrpData.class);
lineMapper.setLineTokenizer(lineTokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
return lineMapper;
}
@Bean
public GrpDataProcessor processor() {
return new GrpDataProcessor();
}
@Bean
public Step step(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step", jobRepository)
.<GrpData, GrpData>chunk(10, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer)
.build();
}
@Bean
public Job job(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("importGrpData", jobRepository)
.flow(step(jobRepository, transactionManager)).end().build();
}
}
@Component
@AllArgsConstructor
public class JobScheduler {
private final JobLauncher jobLauncher;
private final Job job;
private static int count = 0;
@Scheduled(fixedDelay = 6000, initialDelay = 5000)
public void scheduleJob() {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("startsAt", System.currentTimeMillis()).toJobParameters();
try{
jobLauncher.run(job, jobParameters);
} catch (Exception e) {
e.printStackTrace();
}
}
}
为了实现这一目标,我应该在这里做出什么改变?或者我应该创建一个不同的调度程序? 预先感谢。
我看到你已经有了
itemReader.setLinesToSkip(1)
。您可以通过使用 linesToSkip
和 maxItemCount
参数来实现此目的。第一天,linesToSkip=1
和maxItemCount=100
,第二天你可以设置linesToSkip=101
和maxItemCount=100
,等等。这样,作业每天只会处理一百个项目。输入文件 + 偏移量(即 linesToSkip
值)应该是一个作业参数,用于生成不同的作业实例,以便在失败时可以重新启动。
我个人会避免这种情况,如果可能的话,宁愿拆分文件。完成后,我将每天启动一个作业实例,并将文件分区作为作业参数。