我是 Spring Batch 的新手,仍在学习,我使用 IteratorItemReader 、自定义处理器和自定义编写器进行批量配置,如下所示,
@Autowired
JobBuilderFactory jobBuilderFactory;
@Autowired
StepBuilderFactory stepBuilderFactory;
@Value("${inputFile.location}")
private String inputFile;
@Bean
public Job testJob() throws IOException {
return jobBuilderFactory.get("testJob")
.incrementer(new RunIdIncrementer())
.start(testStep())
.listener(new JobListener())
.build();
}
@Bean
public Step testStep() throws IOException {
return stepBuilderFactory.get("testStep")
.<File, File>chunk(1)
.reader(testReader())
.processor(testProcessor())
.writer(testWriter())
.taskExecutor(threadPoolTaskExecutor())
.build();
}
@Bean
public ItemReader<File> testReader() throws IOException {
List<File> files = Files.walk(Paths.get(inputFile), 1)
.filter(Files::isRegularFile)
.map(Path::toFile)
.collect(Collectors.toList());
return new IteratorItemReader<>(files);
}
@Bean
public CustomProcessor testProcessor() {
return new CustomProcessor();
}
@Bean
public CustomWriter testWriter() {
return new CustomWriter();
}
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(6);
executor.setQueueCapacity(4);
executor.initialize();
return executor;
}
这里 testReader() 将检查给定的输入路径并将所有文件列出到一个 List 中,然后返回 IteratorItemReader,然后在处理器中发生业务逻辑。
使用多线程如果输入位置中有多个文件(多个),一切正常,我没有收到任何错误,但是,
问题陈述:假设输入位置只有一个文件(例如:C:/User/documents/abc.txt),一个线程将完全处理该文件,一切正常,但最终我得到以下异常,
ERROR - Encountered an error executing step testStep in job testJob
java.util.NoSuchElementException: null
at java.util.ArrayList$Itr.next(ArrayList.java:864)
at org.springframework.batch.item.support.IteratorItemReader.read (IteratorItemReader.java:70)
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead (SimpleChunk Provider.java:99)
at org.springframework.batch.core.step.item.SimpleChunkProvider.read (SimpleChunkProvider.java:180)
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration (SimpleChunk Provider.java:126)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult (RepeatTemplate.java:375)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal (RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
at org.springframework.batch.core.step.item.SimpleChunk Provider.provide (SimpleChunkProvider.java:118)
at org.springframework.batch.core.step.item. ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:71)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction (TaskletStep.java:407)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
at org.springframework.transaction.support. Transaction Template.execute(Transaction Template.java:140)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext (TaskletStep.java:273)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration (StepContextRepeatCallback.java:82)
at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run (TaskExecutorRepeatTemplate.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
发生此异常只是因为多线程,当我尝试查看 IteratorItemReader 类行号 70 时,我在下面的代码中找到了这个,
if (iterator.hasNext())
return iterator.next();
else
return null; // end of data
解决此问题的最佳解决方案是什么,请提供您的意见,
提前致谢。
任何建议都会有帮助。
直接将其包裹在
SynchronizedItemStreamReader
中是行不通的,因为这需要 ItemStreamReader
,但 IteratorItemReader
不是 ItemStreamReader
。
我认为最好的出路是创建一个自定义的SynchronizedIteratorItemReader
。它与 IteratorItemReader
完全相同,只是 read()
方法应标记为同步。