我正在学习春季批处理,并试图了解异常期间项目处理器的工作方式。
我正在从csv文件中的3条记录中读取数据,并将其处理并将其写入数据库。
我的csv文件
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doem
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
批处理配置,以3的块读取项目,并跳过限制2
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public FlatFileItemReader<Person> reader() {
return new FlatFileItemReaderBuilder<Person>().name("personItemReader").resource(new ClassPathResource("sample-data.csv")).delimited()
.names(new String[] { "firstName", "lastName" }).fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {
{
setTargetType(Person.class);
}
}).build();
}
@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Person>().itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO person (first_name, last_name) VALUES (:firstName, :lastName)").dataSource(dataSource).build();
}
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
return jobBuilderFactory.get("importUserJob").incrementer(new RunIdIncrementer()).listener(listener).flow(step1).end().build();
}
@Bean
public Step step1(JdbcBatchItemWriter<Person> writer) {
return stepBuilderFactory.get("step1").<Person, Person> chunk(3).reader(reader()).processor(processor()).writer(writer).faultTolerant().skipLimit(2)
.skip(Exception.class).build();
}
}
我正在尝试通过在项目处理器中为一条记录手动引发Exception来模拟Exception
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);
@Override
public Person process(final Person person) throws Exception {
final String firstName = person.getFirstName().toUpperCase();
final String lastName = person.getLastName().toUpperCase();
final Person transformedPerson = new Person(firstName, lastName);
log.info("Converting (" + person + ") into (" + transformedPerson + ")");
if (person.getLastName().equals("Doem"))
throw new Exception("DOOM");
return transformedPerson;
}
}
现在,按照跳过限制,当引发异常时,项目处理器将重新处理块并跳过引发错误的项目,并且项目写操作还将在DB中插入所有记录,只有一个异常记录除外。
这很好,因为我的处理器只是将小写字母转换为大写字母,并且可以多次运行而不会产生影响。
但是假设我的项目处理器正在调用Web服务并发送数据。成功调用Web服务后是否引发了异常。然后将再次处理块中的剩余数据(并再次调用Webservice)。我不想再次调用Web服务,因为这就像向Web服务发送重复数据,而Web服务系统无法识别重复数据。
如何处理这种情况。一种选择是“不跳过异常”,这意味着即使处理器调用了Web服务,我在块中的仍然一条记录也不会将其写入项目编写器。因此不正确。
其他选项块的大小应为1,这可能在处理数千个记录时效率不高。
还有哪些其他选择?
根据您的描述,您的项目处理器不是幂等的。但是,文档的Fault tolerance部分指出,使用容错步骤时,项目处理器应该是幂等的。这是摘录:
如果将步骤配置为容错的(通常通过使用跳过或重试处理),则所使用的任何ItemProcessor都应以幂等的方式实现。