我正在努力寻找流程指示器模式示例。到目前为止,我有一个源
STUDENTS
表,其中有一个 STATUS
字段来指示记录是否已被处理。我正在使用任务执行器进行多线程处理。
在我的编写器中,我将已处理的记录插入到新的
PROCESSED_STUDENTS
表中,并将源 STUDENTS
表中已处理记录的状态更新为 Processed
,我在事务块中执行这两个操作如果无法恢复更改。
这不适用于
JdbcPagingItemReader
,因为有些记录在流程结束时仍未处理。
有人可以让我知道我在这里缺少什么吗?
读者
@Bean
@StepScope
public ItemReader<SourceData> reader(DataSource dataSource) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "ToBeProcessed");
JdbcPagingItemReader<SourceData> reader = new JdbcPagingItemReader<>();
reader.setName("Oracle_RCP");
reader.setDataSource(dataSource);
reader.setRowMapper(SourceData.rowMapper());
reader.setParameterValues(parameterValues);
reader.setPageSize(100);
reader.setQueryProvider(getQueryProvider(new OraclePagingQueryProvider(), "SELECT ID, NAME, CREATED_TIME", "FROM STUDENTS", "WHERE STATUS = :status", CREATED_TIME, Order.ASCENDING));
reader.setSaveState(false);
try {
reader.afterPropertiesSet();
} catch (Exception e) {
log.error(e.getMessage(), e.getStackTrace());
}
return reader;
}
public PagingQueryProvider getQueryProvider(AbstractSqlPagingQueryProvider queryProvider, String select, String from, String where, String sortKey, Order order) {
queryProvider.setSelectClause(select);
queryProvider.setFromClause(from);
if (where != null) {
queryProvider.setWhereClause(where);
}
Map<String, Order> sortConfiguration = new HashMap<>();
sortConfiguration.put(sortKey, order);
queryProvider.setSortKeys(sortConfiguration);
return queryProvider;
}
处理器
@Bean
@StepScope
public ItemProcessor<SourceData, OutData> processor(
@Value("#{jobParameters['processDate']}") String processDate) {
return new CustomItemProcessor(processDate);
}
作者
@SuppressWarnings("unchecked")
@Bean
public ItemWriter<OutData> writer(Utils utils) {
return OutDataList -> utils.batchOperation((List<OutData>) OutDataList, chunk);
}
作业、步骤和任务执行器
@Bean("MainJob")
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Job mainJob(JobBuilderFactory jobBuilderFactory, Step step) {
return jobBuilderFactory.get("mainJob")
.incrementer(new RunIdIncrementer())
.flow(step)
.end()
.build();
}
@Bean
public Step step(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("step")
.<SourceData, OutData> chunk(chunk)
.reader(reader(null))
.processor(processor(null))
.writer(writer(null))
.taskExecutor(taskExecutor())
.build();
}
@Bean
@StepScope
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(50);
executor.setCorePoolSize(25);
executor.setQueueCapacity(25);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("MultiThreaded-Executor");
return executor;
}
Writer 中使用的 batchOperation
方法将已处理的记录插入到 PROCESSED_STUDENTS
表中,并更新 STUDENTS
源表中的已处理记录:
public void batchOperation(List<OutData> outDataList, int batchSize) {
try (
Connection con = jdbcTemplate.getDataSource().getConnection();
PreparedStatement psInsert = con.prepareStatement("INSERT INTO PROCESSED_STUDENTS (ID, NAME) VALUES (?, ?)");
PreparedStatement psUpdate = con.prepareStatement("UPDATE STUDENTS SET STATUS = 'Processed' WHERE ID = ?");) {
// Starting transaction block
con.setAutoCommit(false);
int i = 0;
for (OutData argument : outDataList) {
psInsert.setLong(1, argument.getId());
psInsert.setString(2, argument.getName());
psUpdate.setLong(1, argument.getId());
psInsert.addBatch();
psUpdate.addBatch();
i++;
if (i % batchSize == 0) {
psInsert.executeBatch();
psUpdate.executeBatch();
}
}
// Executing remaining batch if total record count is an odd number
psInsert.executeBatch();
psUpdate.executeBatch();
// End transaction block, commit changes
con.commit();
// Setting it back to default true
con.setAutoCommit(true);
} catch (Exception e) {
log.error(e.getMessage());
}
}
在这种情况下您不能使用
JdbcPagingItemReader
,因为您正在更改用作过滤器(状态)的参数。分页不起作用。
只需使用
JdbcCursorItemReader
即可。