Iterator Item Reader 的 Spring 批处理多线程问题

问题描述 投票:0回答:1

我是 Spring Batch 的新手,仍在学习,我使用 IteratorItemReader 、自定义处理器和自定义编写器进行批量配置,如下所示,

@Autowired
JobBuilderFactory jobBuilderFactory;
@Autowired
StepBuilderFactory stepBuilderFactory;
@Value("${inputFile.location}")
private String inputFile;

@Bean
public Job testJob() throws IOException {
    return jobBuilderFactory.get("testJob")
                .incrementer(new RunIdIncrementer())
                .start(testStep())
                .listener(new JobListener())
                .build();
}

@Bean
public Step testStep() throws IOException {
    return stepBuilderFactory.get("testStep")
                .<File, File>chunk(1)
                .reader(testReader())
                .processor(testProcessor())
                .writer(testWriter())
                .taskExecutor(threadPoolTaskExecutor())
                .build();
}

@Bean
public ItemReader<File> testReader() throws IOException {
    List<File> files = Files.walk(Paths.get(inputFile), 1)
                    .filter(Files::isRegularFile)
                    .map(Path::toFile)
                    .collect(Collectors.toList());

        return new IteratorItemReader<>(files);
}

@Bean
public CustomProcessor testProcessor() {
    return new CustomProcessor();
}

@Bean
public CustomWriter testWriter() {
    return new CustomWriter();
}
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);
    executor.setMaxPoolSize(6);
    executor.setQueueCapacity(4);
    executor.initialize();
    return executor;
}

这里 testReader() 将检查给定的输入路径并将所有文件列出到一个 List 中,然后返回 IteratorItemReader,然后在处理器中发生业务逻辑。

使用多线程如果输入位置中有多个文件(多个),一切正常,我没有收到任何错误,但是,

问题陈述:假设输入位置只有一个文件(例如:C:/User/documents/abc.txt),一个线程将完全处理该文件,一切正常,但最终我得到以下异常,

ERROR - Encountered an error executing step testStep in job testJob
java.util.NoSuchElementException: null
at java.util.ArrayList$Itr.next(ArrayList.java:864)
at org.springframework.batch.item.support.IteratorItemReader.read  (IteratorItemReader.java:70)
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead (SimpleChunk Provider.java:99)
at org.springframework.batch.core.step.item.SimpleChunkProvider.read (SimpleChunkProvider.java:180)
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration (SimpleChunk Provider.java:126)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult (RepeatTemplate.java:375)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal (RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
at org.springframework.batch.core.step.item.SimpleChunk Provider.provide (SimpleChunkProvider.java:118)
at org.springframework.batch.core.step.item. ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:71)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction (TaskletStep.java:407)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
at org.springframework.transaction.support. Transaction Template.execute(Transaction Template.java:140)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext (TaskletStep.java:273)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration (StepContextRepeatCallback.java:82)
at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run (TaskExecutorRepeatTemplate.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

发生此异常只是因为多线程,当我尝试查看 IteratorItemReader 类行号 70 时,我在下面的代码中找到了这个,

if (iterator.hasNext())
    return iterator.next();
else
    return null; // end of data

解决此问题的最佳解决方案是什么,请提供您的意见,

提前致谢。

任何建议都会有帮助。

java multithreading spring-boot spring-batch nosuchelementexception
1个回答
0
投票

直接将其包裹在

SynchronizedItemStreamReader
中是行不通的,因为这需要
ItemStreamReader
,但
IteratorItemReader
不是
ItemStreamReader
。 我认为最好的出路是创建一个自定义的
SynchronizedIteratorItemReader
。它与
IteratorItemReader
完全相同,只是
read()
方法应标记为同步。

© www.soinside.com 2019 - 2024. All rights reserved.