临时化 Spring 批处理作业

问题描述 投票:0回答:1

我想让春季批量工作变得临时化。也就是说,我有一个包含超过 100 000 条记录的 CSV 文件。我想每天分块阅读这个文件。例如,第一天的前 100 条记录,然后是第二天的下 100 条记录,依此类推。这是为了最大限度地减少消耗这些数据的系统的负载。

@Configuration
@AllArgsConstructor
public class SpringBatchConfig {

    private final GrpDataWriter writer;

    @Bean
    public FlatFileItemReader<GrpData> reader() {
        FlatFileItemReader<GrpData> itemReader = new FlatFileItemReader<>();
        itemReader.setResource(new FileSystemResource("src/main/resources/grpData.csv"));
        itemReader.setName("dataReader");
        itemReader.setLinesToSkip(1);
        itemReader.setLineMapper(lineMapper());
        return itemReader;
    }

    private LineMapper<GrpData> lineMapper() {
        DefaultLineMapper<GrpData> lineMapper = new DefaultLineMapper<>();
        DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
        lineTokenizer.setDelimiter(",");
        lineTokenizer.setStrict(false);
        lineTokenizer.setNames("id", "first_name", "last_name", "email");
        BeanWrapperFieldSetMapper<GrpData> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
        fieldSetMapper.setTargetType(GrpData.class);
        lineMapper.setLineTokenizer(lineTokenizer);
        lineMapper.setFieldSetMapper(fieldSetMapper);
        return lineMapper;
    }

    @Bean
    public GrpDataProcessor processor() {
        return new GrpDataProcessor();
    }

    @Bean
    public Step step(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("step", jobRepository)
                .<GrpData, GrpData>chunk(10, transactionManager)
                .reader(reader())
                .processor(processor())
                .writer(writer)
                .build();
    }

    @Bean
    public Job job(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new JobBuilder("importGrpData", jobRepository)
                .flow(step(jobRepository, transactionManager)).end().build();
    }
}
@Component
@AllArgsConstructor
public class JobScheduler {

    private final JobLauncher jobLauncher;

    private final Job job;

    private static int count = 0;

    @Scheduled(fixedDelay = 6000, initialDelay = 5000)
    public void scheduleJob() {
        JobParameters jobParameters = new JobParametersBuilder()
                .addLong("startsAt", System.currentTimeMillis()).toJobParameters();
        try{
            jobLauncher.run(job, jobParameters);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

为了实现这一目标,我应该在这里做出什么改变?或者我应该创建一个不同的调度程序? 预先感谢。

spring spring-boot spring-batch
1个回答
0
投票

我看到你已经有了

itemReader.setLinesToSkip(1)
。您可以通过使用
linesToSkip
maxItemCount
参数来实现此目的。第一天,
linesToSkip=1
maxItemCount=100
,第二天你可以设置
linesToSkip=101
maxItemCount=100
,等等。这样,作业每天只会处理一百个项目。输入文件 + 偏移量(即
linesToSkip
值)应该是一个作业参数,用于生成不同的作业实例,以便在失败时可以重新启动。

我个人会避免这种情况,如果可能的话,宁愿拆分文件。完成后,我将每天启动一个作业实例,并将文件分区作为作业参数。

© www.soinside.com 2019 - 2024. All rights reserved.