重新启动失败的作业并不是再次处理失败的块数据

问题描述 投票:0回答:1

提供示例应用程序来重现问题,https://github.com/PSHREYASHOLLA/SamplebatchApplication

它是一个maven项目,所以你可以调用mvn install,它会创建arget\SamplebatchApplication-0.0.1-SNAPSHOT.jar。现在你可以像任何 springboot 应用程序(启用 Liquibase)一样启动它,java -jar SamplebatchApplication-0.0.1-SNAPSHOT.jar。

如果您看到 application.properties 文件,我们将其指向 postgres 数据库。我们所有的批量配置https://github.com/PSHREYASHOLLA/SamplebatchApplication/blob/main/src/main/java/com/example/postgresql/model/FEBPDBConfig.java.

请通过调用rest post API来启动批处理过程,http://localhost:8080/batch/trigger-or-resume-application-batch-job JSON body { "appRestartJobExecutionId": "" } 如果我们用空的appRestartJobExecutionId调用它,流程如下所示, com.example.postgresql.controller.BatchController.triggerOrResumeApplicationBatchJobByB2E() --->com.example.postgresql.model.FebpApplicationJobServiceImpl.triggerApplicationBatchJob() --> 我们执行 JobLauncher.run()。现在,该作业将从 febp_emp_detail_test 中读取 50 条记录,作为 reader 的一部分,并作为 writer 的一部分将更新的记录写入 febp_emp_tax_detail_test。这是快乐的流动。

现在,如果您调用上述 API,它将处理部分数据,并在表中对 emp0001 到 emp0030 进行部分提交,这将发生在 febp_emp_tax_detail_test 中。处理 emp0032 记录时它将失败,如您在 https://github.com/PSHREYASHOLLA/SamplebatchApplication/blob/main/src/main/java/com/example/postgresql/model/EmployeeTaxCalculationProcessor.java

中看到的
private static int counter=1;


if(item.getEmpId().equals("emp0032"))
{
    if(counter==1)
    {
        counter++;
        throw new Exception();
    }
}

计数器将为 1,并且将为记录 emp0032 抛出异常,并且作业进入 FAILED 状态。现在假设我重新启动服务器并使用失败的作业执行 ID 调用相同的 post API,它现在将调用 om.example.postgresql.controller.BatchController.triggerOrResumeApplicationBatchJobByB2E() --->com.example.postgresql.model.FebpApplicationJobServiceImpl.resumeApplicationBatchJob( ) ---> jobOperator.restart(failedBatchExecutionId);

作业从下一个块开始处理记录,即 emp0036 等。它不是再次处理失败的块吗?这里重启的想法是对未处理的数据再次重新执行相同的作业。这是单线程作业。

spring-batch spring-batch-tasklet spring-batch-job-monitoring
1个回答
0
投票

它应该从最后一个正确处理的块重新启动(即失败的块将被重新处理)。您共享的代码中声明阅读器的方式对于重新启动功能的工作不正确。目前您有:

@Component
public class EmployeeTaxCalculationReader {


   public JdbcPagingItemReader<EmployeeDetail> getPagingItemReader() throws Exception {
      // ...
   }

}

然后在作业/步骤定义中,读者设置为

.reader(reader.getPagingItemReader())
:

@Bean(name="FEBP_EMP_TAX_CALCULATION")
public Job getBatchJob() throws Exception {
    ItemProcessor<EmployeeDetail, EmployeeTaxDetail> itemProcessor = appContext.getBean(EmployeeTaxCalculationProcessor.class);
    ItemWriter<EmployeeTaxDetail> itemWriter = appContext.getBean(EmployeeTaxCalculationWriter.class);
    EmployeeTaxCalculationReader reader = appContext.getBean(EmployeeTaxCalculationReader.class);
    Step step = new StepBuilder("FEBP_EMP_TAX_CALCULATION_STEP", jobRepository)
        .<EmployeeDetail, EmployeeTaxDetail>chunk(5, transactionManager)
        .reader(reader.getPagingItemReader()).processor(itemProcessor).writer(itemWriter).taskExecutor(actStmntTaskExecutor())
        .build();

    Job febpTaxCalculationJob = new JobBuilder("FEBP_EMP_TAX_CALCULATION", jobRepository).incrementer(new RunIdIncrementer()).start(step).build();
    return febpTaxCalculationJob;
  }

因此 reader 不是 Spring bean,因此 Spring Batch 不会在其周围创建代理作为

ItemStream
来保存执行上下文中的进度,这是重新启动时从上一个检查点恢复所需的。
JdbcPagingItemReader
应声明为 bean。您可以在此处找到示例。

© www.soinside.com 2019 - 2024. All rights reserved.