我们有包含 1 亿条记录的大型 csv 文件,并使用 Spring Batch 通过使用“SystemCommandTasklet”拆分包含 100 万条记录的文件来加载、读取和写入数据库。下面是片段,
@Bean
@StepScope
public SystemCommandTasklet splitFileTasklet(@Value("#{jobParameters[filePath]}") final String inputFilePath) {
SystemCommandTasklet tasklet = new SystemCommandTasklet();
final File file = BatchUtilities.prefixFile(inputFilePath, AppConstants.PROCESSING_PREFIX);
final String command = configProperties.getBatch().getDataLoadPrep().getSplitCommand() + " " + file.getAbsolutePath() + " " + configProperties.getBatch().getDataLoad().getInputLocation() + System.currentTimeMillis() / 1000;
tasklet.setCommand(command);
tasklet.setTimeout(configProperties.getBatch().getDataLoadPrep().getSplitCommandTimeout());
executionContext.put(AppConstants.FILE_PATH_PARAM, file.getPath());
return tasklet;
}
和批量配置:
batch:
data-load-prep:
input-location: /mnt/mlr/prep/
split-command: split -l 1000000 --additional-suffix=.csv
split-command-timeout: 900000 # 15 min
schedule: "*/60 * * * * *"
lock-at-most: 5m
通过上述配置,我可以成功读取负载并写入数据库。但是,发现以下代码片段的一个错误,即分割文件后,只有第一个文件会有标题,但下一个分割的文件在第一行没有听者。因此,我必须禁用或避免 FlatFileItemReader(CSVReader) 的linesToSkip(1) 配置。
@Configuration
public class DataLoadReader {
@Bean
@StepScope
public FlatFileItemReader<DemographicData> demographicDataCSVReader(@Value("#{jobExecutionContext[filePath]}") final String filePath) {
return new FlatFileItemReaderBuilder<DemographicData>()
.name("data-load-csv-reader")
.resource(new FileSystemResource(filePath))
.linesToSkip(1) // Need to avoid this from 2nd splitted file onwards as splitted file does not have headers
.lineMapper(lineMapper())
.build();
}
public LineMapper<DemographicData> lineMapper() {
DefaultLineMapper<DemographicData> defaultLineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setNames("id", "mdl65DecileNum", "mdl66DecileNum", "hhId", "dob", "firstName", "middleName",
"lastName", "addressLine1", "addressLine2", "cityName", "stdCode", "zipCode", "zipp4Code", "fipsCntyCd",
"fipsStCd", "langName", "regionName", "fipsCntyName", "estimatedIncome");
defaultLineMapper.setLineTokenizer(lineTokenizer);
defaultLineMapper.setFieldSetMapper(new DemographicDataFieldSetMapper());
return defaultLineMapper;
}
}
注意:加载程序在加载时不应跳过第二个文件的第一行。
提前谢谢您。感谢任何建议。
我会使用以下命令在
SystemCommandTasklet
中执行此操作:
tail -n +2 data.csv | split -l 1000000 --additional-suffix=.csv
如果您确实想在 Spring Batch 作业中使用 Java 来完成此操作,您可以使用自定义读取器或过滤标头的项目处理器。但我不推荐这种方法,因为它为每个项目引入了额外的测试(考虑到输入文件中的行数较多,这可能会影响您的工作性能)。