JpaCursorItemReader 每次只调用批量大小还是完整文件?

问题描述 投票:0回答:1

项目阅读器代码

    @Bean
    public JpaCursorItemReader<ExtAssessments> extAssessmentsDiabeticReader() {
        String jpqlQuery = "from ExtAssessments WHERE processStatus='new' AND type='diabetic'";
        JpaCursorItemReader<ExtAssessments> itemReader = new JpaCursorItemReader<>();
        itemReader.setQueryString(jpqlQuery);
        itemReader.setEntityManagerFactory(entityManagerFactory);
        try {
            itemReader.afterPropertiesSet();
        } catch (Exception e) {
            log.info("Exception in item reader : " + e.getMessage());
        }
        itemReader.setSaveState(true);
        return itemReader;
    }

ItemProcessor 代码 -> 使用模型映射器将项目映射到 DiabeticAssessment: extIdClassMap:用于单个错误处理 updateTargetTableStatusToProcessing:用于将状态更新为正在处理和已完成

    
    @Bean
    public ItemProcessor<ExtAssessments, DiabeticAssessment> diabeticAssessmentProcessor() {
        return item -> {
            DiabeticAssessment diabeticAssessment = mapper.map(item, DiabeticAssessment.class);
            log.info("set created by as data migration and created date as current data.");
            diabeticAssessment.setCreatedBy(CREATED_BY_DATA_MIGRATION);
            diabeticAssessment.setCreatedDatetime(Date.from(Instant.now()));
            log.info("update ext Assessment process status as processing");
            updateTargetTableStatusToProcessing(item);
            log.info("Item has been processed for ext assessment id: " + item.getId());
            int hashCode = diabeticAssessment.hashCode();
            extIdClassMap.put(String.valueOf(hashCode), item.getId());
            return diabeticAssessment;
        };
    }

ItemWriter 代码:


    @Transactional
    @Bean
    public ItemWriter<DiabeticAssessment> diabeticAssessmentWriter() {
        return items -> {
            Boolean isErrorFound;
            String errorMessage;
            for (DiabeticAssessment diabeticAssessment : items) {
                isErrorFound = false;
                errorMessage = "";
                log.info("diabetic assessment data processing: " + diabeticAssessment);
                try {
                    Map<String, Object> validate = isValidate(diabeticAssessment);
                    if ((Boolean) validate.get(IS_VALIDATE)) {
                        diabeticAssessmentRepository.save(diabeticAssessment);

                    } else {
                        isErrorFound = true;
                        errorMessage = validate.get(VALIDATION_ERROR_MESSAGE).toString();
                        //ERROR LOG SAVE METHOD for individual failed
                        log.info("ERROR JOB ID: --->>>  " + jobId + " STEP ID: " + stepId + " Validation error message: "
                                + validate.get(VALIDATION_ERROR_MESSAGE));
                    }

                } catch (Exception e) {
                    isErrorFound = true;
                    errorMessage = "Exception during insertion operation";
                    log.info("diabetic assessment data not processed for JOB ID:" + jobId + " And STEP ID: " + stepId);
                }
                if (isErrorFound) {
                    MigrationLogRequest migrationLog = MigrationLogRequest.builder()
                            .extId(extIdClassMap.get(String.valueOf(diabeticAssessment.hashCode())))
                            .jobId(jobId)
                            .stepId(stepId)
                            .fromTable(TABLE_EXT_ASSESSMENTS)
                            .toTable(TABLE_DIABETIC_ASSESSMENT)
                            .message(errorMessage)
                            .severity(DATA_MIGRATION_ERROR)
                            .createdAt(Instant.now())
                            .createdBy(CREATED_BY_DATA_MIGRATION)
                            .build();
                    migrationLogService.CreateMigraitonLog(migrationLog);
                }
                log.info("assessment processing done for uuid: " + diabeticAssessment.getProfileId());
            }
            updateTargetTableStatusToCompleted();
            boolean isSuccessful = validateRecords();
           // if (!isSuccessful) {
                // TO DO
                //ERROR Message for chunk of data process failed
                //   loggerMethod(); //error item
           // } else {
                // loggerMethod(); //success item
          //  }
        };
    }

步骤代码

    @Bean
    public Step stepDiabeticAssessmentInfo() {
        return stepBuilder.get("stepDiabeticAssessmentInfo")
                .<ExtAssessments, DiabeticAssessment>chunk(300)
                .reader(extAssessmentsDiabeticReader())
                .processor(diabeticAssessmentProcessor())
                .faultTolerant()
                .skip(Exception.class)
                .skipLimit(300)
                .writer(diabeticAssessmentWriter())
                .build();
    }

我们可以在这里看到chunk是300。

JPQL 项读取器仅调用 300 个数据项吗?还是它调用了所有数据,每次都会降低性能?

 String jpqlQuery = "from ExtAssessments WHERE processStatus='new' AND type='diabetic'";

如果每次都调用所有数据,是否有任何JPQL或项目读取器方法来获取

process_status = new
的前300个数据,然后处理并保存而不获取所有数据?

sql spring spring-batch jpql
1个回答
0
投票

也许你可以把它想象成一个分页查询,其中块数代表一个页面上读取了多少个项目, 然后是读取、处理和写入的流程。然后查询下一页定义的chunk数量。

我希望我能正确理解你并能帮助你

© www.soinside.com 2019 - 2024. All rights reserved.