有没有办法在spring批处理中捕获SkipLimitExceedException

问题描述 投票:2回答:1

我正在使用multiResourceItemReader读取csv文件,并且我将跳过限制保持为10.当超出限制时我想要捕获SkipLimitExceedException并使用“Invalid csv”之类的消息抛出我自己的自定义异常,其中或我怎么抓住它?

try {
      log.info("Running job to insert batch fcm: {} into database.", id);
            jobLauncher
                    .run(importJob, new JobParametersBuilder()
                    .addString("fullPathFileName", TMP_DIR)
                    .addString("batch_fcm_id", String.valueOf(id))
                    .addLong("time",System.currentTimeMillis())
                    .toJobParameters());
        }
catch(...){...}

我不能在这里抓住它,是因为我使用MultiResourceItemReader并且异步过程不允许我在这里捕获它?

我的工作如下

@Bean(name = "fcmJob")
    Job importJob(@Qualifier(MR_ITEM_READER) Reader reader,
                  @Qualifier(JDBC_WRITER) JdbcBatchItemWriter jdbcBatchItemWriter,
                  @Qualifier("fcmTaskExecutor") TaskExecutor taskExecutor) {
        Step writeToDatabase = stepBuilderFactory.get("file-database")//name of step
                .<FcmIdResource, FcmIdResource>chunk(csvChunkSize) // <input as, output as>
                .reader(reader)
                .faultTolerant()
                .skipLimit(10)
                .skip(UncategorizedSQLException.class)
                .noSkip(FileNotFoundException.class)
                .writer(jdbcBatchItemWriter)
                .taskExecutor(taskExecutor)
                .throttleLimit(20)
                .build();

        return jobBuilderFactory.get("jobBuilderFactory") //Name of job builder factory
                .incrementer(new RunIdIncrementer())
                .start(writeToDatabase)
                .on("*")
                .to(deleteTemporaryFiles())
                .end()
                .build();
    }

我已经尝试过使用ItemReaderListener,SkipPolicy,SkipListener,但他们不能抛出异常,还有其他方法吗?

spring-boot spring-batch
1个回答
1
投票

您正在寻找的异常不会被作业抛出,您可以使用JobExecution#getAllFailureExceptions从作业执行中获取它。

所以在你的例子中,而不是做:

try {
    jobLauncher.run(job, new JobParameters());
} catch (Exception e) {
   //...
}

你应该做:

JobExecution jobExecution = jobLauncher.run(job, new JobParameters());
List<Throwable> allFailureExceptions = jobExecution.getFailureExceptions();

在你的情况下,SkipLimitExceedException将是allFailureExceptions之一。

编辑:添加一个示例,显示SkipLimitExceedExceptionallFailureExceptions的一部分:

import java.util.Arrays;
import java.util.List;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class MyJob {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public ItemReader<Integer> itemReader() {
        return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
    }

    @Bean
    public ItemProcessor<Integer, Integer> itemProcessor() {
        return item -> {
            if (item % 3 == 0) {
                throw new IllegalArgumentException("no multiples of three here! " + item);
            }
            return item;
        };
    }

    @Bean
    public ItemWriter<Integer> itemWriter() {
        return items -> {
            for (Integer item : items) {
                System.out.println("item = " + item);
            }
        };
    }

    @Bean
    public Step step() {
        return steps.get("step")
                .<Integer, Integer>chunk(2)
                .reader(itemReader())
                .processor(itemProcessor())
                .writer(itemWriter())
                .faultTolerant()
                .skip(IllegalArgumentException.class)
                .skipLimit(2)
                .build();
    }

    @Bean
    public Job job() {
        return jobs.get("job")
                .start(step())
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        JobExecution jobExecution = jobLauncher.run(job, new JobParameters());
        List<Throwable> allFailureExceptions = jobExecution.getAllFailureExceptions();
        for (Throwable failureException : allFailureExceptions) {
            System.out.println("failureException = " + failureException);
        }
    }

}

此示例打印:

item = 1
item = 2
item = 4
item = 5
item = 7
item = 8
failureException = org.springframework.batch.core.step.skip.SkipLimitExceededException: Skip limit of '2' exceeded
© www.soinside.com 2019 - 2024. All rights reserved.