为什么Spring Batch AsycItemProcessor中的异常会被SkipListener的onSkipInWrite方法捕获?

问题描述 投票:0回答:1

我正在编写一个Spring Boot应用程序,它可以启动、收集并将数百万个数据库条目转换为新的精简JSON格式,然后将它们全部发送到一个GCP PubSub主题。我试图使用Spring Batch来完成这个任务,但我在实现过程的容错方面遇到了麻烦。数据库充满了数据质量问题,有时我的JSON转换会失败。当失败发生时,我不希望作业立即退出,我希望它能继续处理尽可能多的记录,并且在完成之前,报告到底是哪些记录失败了,这样我和或我的团队就可以检查这些有问题的数据库条目。

为了实现这个目标,我尝试使用Spring Batch的SkipListener接口。但我在处理过程中还使用了一个AsyncItemProcessor和一个AsyncItemWriter,即使异常发生在处理过程中,SkipListener的 onSkipInWrite() 方法是抓住他们--而不是 onSkipInProcess() 的方法。而不幸的是 onSkipInWrite() 方法没有访问原始数据库实体的权限,所以我不能将它的ID存储在有问题的DB条目列表中。

我是不是配置错了什么?是否有其他方法可以从读者那里访问AsynItemProcessor处理步骤失败的对象?

以下是我试过的方法......

我有一个单人Spring组件,在那里我存储了多少个成功处理的DB条目,以及多达20个有问题的数据库条目。

@Component
@Getter //lombok
public class ProcessStatus {
    private int processed;
    private int failureCount;
    private final List<UnexpectedFailure> unexpectedFailures = new ArrayList<>();

    public void incrementProgress { processed++; }
    public void logUnexpectedFailure(UnexpectedFailure failure) {
        failureCount++;
        unexpectedFailure.add(failure);
    }

    @Getter
    @AllArgsConstructor
    public static class UnexpectedFailure {
        private Throwable error;
        private DBProjection dbData;
    }
}

我有一个Spring批处理跳过监听器,它应该可以捕捉失败并相应地更新我的状态组件。

@AllArgsConstructor
public class ConversionSkipListener implements SkipListener<DBProjection, Future<JsonMessage>> {
    private ProcessStatus processStatus;

    @Override
    public void onSkipInRead(Throwable error) {}

    @Override
    public void onSkipInProcess(DBProjection dbData, Throwable error) {
        processStatus.logUnexpectedFailure(new ProcessStatus.UnexpectedFailure(error, dbData));
    }

    @Override
    public void onSkipInWrite(Future<JsonMessage> messageFuture, Throwable error) {
        //This is getting called instead!! Even though the exception happened during processing :(
        //But I have no access to the original DBProjection data here, and messageFuture.get() gives me null.
    }
}

然后我对我的工作进行了这样的配置:

@Configuration
public class ConversionBatchJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private TaskExecutor processThreadPool;

    @Bean
    public SimpleCompletionPolicy processChunkSize(@Value("${commit.chunk.size:100}") Integer chunkSize) {
        return new SimpleCompletionPolicy(chunkSize);
    }

    @Bean
    @StepScope
    public ItemStreamReader<DbProjection> dbReader(
            MyDomainRepository myDomainRepository,
            @Value("#{jobParameters[pageSize]}") Integer pageSize,
            @Value("#{jobParameters[limit]}") Integer limit) {
        RepositoryItemReader<DbProjection> myDomainRepositoryReader = new RepositoryItemReader<>();
        myDomainRepositoryReader.setRepository(myDomainRepository);
        myDomainRepositoryReader.setMethodName("findActiveDbDomains"); //A native query
        myDomainRepositoryReader.setArguments(new ArrayList<Object>() {{
            add("ACTIVE");
        }});
        myDomainRepositoryReader.setSort(new HashMap<String, Sort.Direction>() {{
            put("update_date", Sort.Direction.ASC);
        }});
        myDomainRepositoryReader.setPageSize(pageSize);
        myDomainRepositoryReader.setMaxItemCount(limit);
        // myDomainRepositoryReader.setSaveState(false); <== haven't figured out what this does yet
        return myDomainRepositoryReader;
    }

    @Bean
    @StepScope
    public ItemProcessor<DbProjection, JsonMessage> dataConverter(DataRetrievalSerivice dataRetrievalService) {
        //Sometimes throws exceptions when DB data is exceptionally weird, bad, or missing
        return new DbProjectionToJsonMessageConverter(dataRetrievalService);
    }

    @Bean
    @StepScope
    public AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter(
            ItemProcessor<DbProjection, JsonMessage> dataConverter) throws Exception {
        AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter = new AsyncItemProcessor<>();
        asyncDataConverter.setDelegate(dataConverter);
        asyncDataConverter.setTaskExecutor(processThreadPool);
        asyncDataConverter.afterPropertiesSet();
        return asyncDataConverter;
    }

    @Bean
    @StepScope
    public ItemWriter<JsonMessage> jsonPublisher(GcpPubsubPublisherService publisherService) {
        return new JsonMessageWriter(publisherService);
    }

    @Bean
    @StepScope
    public AsyncItemWriter<JsonMessage> asyncJsonPublisher(ItemWriter<JsonMessage> jsonPublisher) throws Exception {
        AsyncItemWriter<JsonMessage> asyncJsonPublisher = new AsyncItemWriter<>();
        asyncJsonPublisher.setDelegate(jsonPublisher);
        asyncJsonPublisher.afterPropertiesSet();
        return asyncJsonPublisher;
    }

    @Bean
    public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
                                  ItemStreamReader<DbProjection> dbReader,
                                  AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter,
                                  AsyncItemWriter<JsonMessage> asyncJsonPublisher,
                                  ProcessStatus processStatus,
                                  @Value("${conversion.failure.limit:20}") int maximumFailures) {
        return stepBuilderFactory.get("conversionProcess")
                .<DbProjection, Future<JsonMessage>>chunk(processChunkSize)
                .reader(dbReader)
                .processor(asyncDataConverter)
                .writer(asyncJsonPublisher)
                .faultTolerant()
                .skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
                            //  ^ for now this returns true for everything until 20 failures
                    .listener(new ConversionSkipListener(processStatus))
                .build();
    }

    @Bean
    public Job conversionJob(Step conversionProcess) {
        return jobBuilderFactory.get("conversionJob")
                .start(conversionProcess)
                .build();
    }
}
spring-boot spring-batch skip fault-tolerance itemprocessor
1个回答
1
投票

这是因为未来由 AsyncItemProcessor 只在 AsyncItemWriter因此,当时可能发生的任何异常都会被看作是写异常,而不是处理异常。这就是为什么 onSkipInWrite 被称为代替 onSkipInProcess.

这实际上是这个模式的一个已知的局限性,这一点在 AsyncItemProcessor,以下是节选。

Because the Future is typically unwrapped in the ItemWriter,
there are lifecycle and stats limitations (since the framework doesn't know 
what the result of the processor is).

While not an exhaustive list, things like StepExecution.filterCount will not
reflect the number of filtered items and 
itemProcessListener.onProcessError(Object, Exception) will not be called.

Javadoc说,这个列表并不详尽,而且副作用是关于: SkipListener 你正在经历的就是这些限制之一。

© www.soinside.com 2019 - 2024. All rights reserved.