尝试在 Spring Batch 中实现流程指示器模式

问题描述 投票:0回答:1

我正在努力寻找流程指示器模式示例。到目前为止,我有一个源

STUDENTS
表,其中有一个
STATUS
字段来指示记录是否已被处理。我正在使用任务执行器进行多线程处理。

在我的编写器中,我将已处理的记录插入到新的

PROCESSED_STUDENTS
表中,并将源
STUDENTS
表中已处理记录的状态更新为
Processed
,我在事务块中执行这两个操作如果无法恢复更改。

这不适用于

JdbcPagingItemReader
,因为有些记录在流程结束时仍未处理。

有人可以让我知道我在这里缺少什么吗?

读者

@Bean
@StepScope
public ItemReader<SourceData> reader(DataSource dataSource) {
        
    Map<String, Object> parameterValues = new HashMap<>();
    parameterValues.put("status", "ToBeProcessed");     
    
    JdbcPagingItemReader<SourceData> reader = new JdbcPagingItemReader<>();
    reader.setName("Oracle_RCP");
    reader.setDataSource(dataSource);
    reader.setRowMapper(SourceData.rowMapper());        
    reader.setParameterValues(parameterValues);
    reader.setPageSize(100);        
    reader.setQueryProvider(getQueryProvider(new OraclePagingQueryProvider(), "SELECT ID, NAME, CREATED_TIME", "FROM STUDENTS", "WHERE STATUS = :status", CREATED_TIME, Order.ASCENDING));
    reader.setSaveState(false);     
    
    try {
        reader.afterPropertiesSet();
    } catch (Exception e) {
        log.error(e.getMessage(), e.getStackTrace());
    }
    
    return reader;
}

public PagingQueryProvider getQueryProvider(AbstractSqlPagingQueryProvider queryProvider, String select, String from, String where, String sortKey, Order order) {
    queryProvider.setSelectClause(select);
    queryProvider.setFromClause(from);
    
    if (where != null) {
        queryProvider.setWhereClause(where);
    }
    
    Map<String, Order> sortConfiguration = new HashMap<>();
    sortConfiguration.put(sortKey, order);
    queryProvider.setSortKeys(sortConfiguration);
    
    return queryProvider;
}

处理器

@Bean
@StepScope
public ItemProcessor<SourceData, OutData> processor(
        @Value("#{jobParameters['processDate']}") String processDate) {
    return new CustomItemProcessor(processDate);        
}

作者

@SuppressWarnings("unchecked")
@Bean
public ItemWriter<OutData> writer(Utils utils) {        
    return OutDataList -> utils.batchOperation((List<OutData>) OutDataList, chunk); 
}

作业、步骤和任务执行器

@Bean("MainJob")
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Job mainJob(JobBuilderFactory jobBuilderFactory, Step step) {        
    return jobBuilderFactory.get("mainJob")   
      .incrementer(new RunIdIncrementer())          
      .flow(step)
      .end()
      .build();
}

@Bean
public Step step(StepBuilderFactory stepBuilderFactory) {        
    return stepBuilderFactory.get("step")               
            .<SourceData, OutData> chunk(chunk)
            .reader(reader(null))
            .processor(processor(null))
            .writer(writer(null))   
            .taskExecutor(taskExecutor())
            .build();
}

@Bean
@StepScope
public ThreadPoolTaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setMaxPoolSize(50);
    executor.setCorePoolSize(25);       
    executor.setQueueCapacity(25);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setThreadNamePrefix("MultiThreaded-Executor");
    return executor;
}
Writer 中使用的

batchOperation
方法将已处理的记录插入到
PROCESSED_STUDENTS
表中,并更新
STUDENTS
源表中的已处理记录:

public void batchOperation(List<OutData> outDataList, int batchSize) {
    try (
            Connection con = jdbcTemplate.getDataSource().getConnection(); 
            PreparedStatement psInsert = con.prepareStatement("INSERT INTO PROCESSED_STUDENTS (ID, NAME) VALUES (?, ?)");
            PreparedStatement psUpdate = con.prepareStatement("UPDATE STUDENTS SET STATUS = 'Processed' WHERE ID = ?");) {
        
        // Starting transaction block
        con.setAutoCommit(false);
        
        int i = 0;
        for (OutData argument : outDataList) {          
            psInsert.setLong(1, argument.getId());
            psInsert.setString(2, argument.getName());
                        
            psUpdate.setLong(1, argument.getId());
                        
            psInsert.addBatch();
            psUpdate.addBatch();
                        
            i++;
                        
            if (i % batchSize == 0) {
                psInsert.executeBatch();
                psUpdate.executeBatch();
            }               
        }
        
        // Executing remaining batch if total record count is an odd number
        psInsert.executeBatch();
        psUpdate.executeBatch();
        
        // End transaction block, commit changes
        con.commit();

        // Setting it back to default true
        con.setAutoCommit(true);
        
    } catch (Exception e) {
        log.error(e.getMessage());
    }
}
java spring-boot spring-batch
1个回答
0
投票

在这种情况下您不能使用

JdbcPagingItemReader
,因为您正在更改用作过滤器(状态)的参数。分页不起作用。

只需使用

JdbcCursorItemReader
即可。

另请参阅:(Spring Batch)并非所有记录都已处理

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.