如何在 java Spring Batch 应用程序的步骤之间传递数据

问题描述 投票:0回答:1

我有一个 Spring Batch 项目,可以通过两个不同的步骤读取两个 txt 文件(Cyties 和 Histories)。我使用 ItemsReaders,一切都很好,我什至可以将它们一起写入 json 格式的单个新文件中(这是期望的目标)。但在编写它们之前,我需要在项目处理器中对它们进行比较,以便出现匹配时。每个城市都有它的历史。我正在尝试设置 jobExecutionContext 以便从 process1 中检索我的历史列表,并将它们与 process2 中的城市列表进行比较。对我来说这是不可能实现的。你能帮我找出问题所在吗?

//这是我的batchCongig

@Configuration
public class BatchConfiguration {

  @Bean
  DataSource dataSource() {
    return new EmbeddedDatabaseBuilder().setType(EmbeddedDatabaseType.H2).addScript("/org/springframework/batch/core/schema-h2.sql").generateUniqueName(
        true).build();
  }

  @Bean
  PlatformTransactionManager transactionManager(DataSource ds) {
    return new JdbcTransactionManager(ds);
  }

  @Bean
  public Job runjob(JobRepository jobRepository, PlatformTransactionManager txManager)
      throws Exception {
    return new JobBuilder("runJob", jobRepository)
        .start(step1(jobRepository, txManager))
        .next(step2(jobRepository, txManager))
        .build();
  }

  @Bean
  public Step step1(JobRepository jobRepository, PlatformTransactionManager txManager)
      throws Exception {
    return new StepBuilder("step1")
        .repository(jobRepository)
        .<Historique,Historique>chunk(100)
        .reader(readerStep1())
        .processor(processStep1())
        .writer(writerStep1())
        .listener(promotionListener())
        .transactionManager(txManager)
        .build();
  }

  @Bean
  public ExecutionContextPromotionListener promotionListener() {
    ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
    listener.setKeys(new String[]{"histories"});
    return listener;
  }

  @Bean
  public Step step2(JobRepository jobRepository, PlatformTransactionManager txManager)
      throws Exception {
    return  new StepBuilder("step1")
        .repository(jobRepository)
        .<City,City>chunk(100)
        .reader(readerStep2())
        .processor(processStep2())
        .writer(writerStep2())
        .transactionManager(txManager)
        .build();
  }

//
//  @Bean
//  public Step stepCity(JobRepository jobRepository, PlatformTransactionManager txManager)
//      throws Exception {
//    return new StepBuilder("stepCity")
//        .repository(jobRepository)
//        .<City, City>chunk(100)
//        .reader(readerCity())
//        .processor(processorCity())
//        .writer(writerCity())
//        .transactionManager(txManager).build();
//  }

  @Bean
  public ItemReader<Historique> readerStep1()
      throws Exception {
    FlatFileItemReader<Historique> readerStep1 = new FlatFileItemReader<Historique>();
    readerStep1.setLineMapper(new DefaultLineMapper() {{
      setLineTokenizer(new DelimitedLineTokenizer() {{
        setNames(new String[]{"mod", "date_eff", "typecom_av", "com_av", "tncc_av", "ncc_av", "nccenr_av", "libelle_av", "typecom_ap", "com_ap", "tncc_ap", "ncc_ap",
                             "nccenr_ap", "libelle_ap"});
      }});
      setFieldSetMapper(new BeanWrapperFieldSetMapper<Historique>() {{
        setTargetType(Historique.class);
      }});
    }});
    readerStep1.setResource(new ClassPathResource("documents/HistoriqueCities.txt"));
    return readerStep1;
  }

  @Bean
  public ItemReader<City> readerStep2()
      throws Exception {
    FlatFileItemReader<City> readerStep2 = new FlatFileItemReader<City>();
    readerStep2.setLineMapper(new DefaultLineMapper() {{
      setLineTokenizer(new DelimitedLineTokenizer() {{
        setNames(new String[]{"typecom", "com", "reg", "dep", "arr", "tncc", "ncc", "nccenr", "libelle", "can", "comparent"});
      }});
      setFieldSetMapper(new BeanWrapperFieldSetMapper<City>() {{
        setTargetType(City.class);
      }});
    }});
    readerStep2.setResource(new ClassPathResource("documents/Cities.txt"));
    return readerStep2;
  }

  @Bean
  public ProcessStep1 processStep1() {
    return new ProcessStep1();
  }

  @Bean
  public ProcessStep2 processStep2() {
    return new ProcessStep2() {
    };
  }

  @Bean
  public ItemWriter<Historique> writerStep1() {
    JsonFileItemWriter<Historique> writerStep1 = new JsonFileItemWriter<>(new FileSystemResource("src/main/java/output/testStep.json"),
        new JsonObjectMarshaller<Historique>() {
          @Override
          public String marshal(Historique object) {
            try {
              ObjectMapper mapper = new ObjectMapper();
              mapper.configure(SerializationFeature.INDENT_OUTPUT, true);
              return mapper.writeValueAsString(object);
            }
            catch(JsonProcessingException e) {
              e.printStackTrace();
              return null;
            }
          }
        }) {
      @Override
      public void open(ExecutionContext executionContext)
          throws ItemStreamException {
        super.open(executionContext);
        try {
          getOutputState().write("[{Historique: }");
        }
        catch(IOException e) {
          throw new RuntimeException(e);
        }
      }
    };
    writerStep1.setResource(new FileSystemResource("src/main/java/output/testStep.json"));
    writerStep1.setJsonObjectMarshaller(new JsonObjectMarshaller<Historique>() {

      @Override
      public String marshal(Historique historique) {
        Map<String, String> mapStory = new LinkedHashMap<>();
        mapStory.put("parent", historique.getCom_av());
        mapStory.put("typeMod", historique.getTypecom_av());
        mapStory.put("typeModNumber", historique.getMod());
        mapStory.put("oldLabel", historique.getLibelle_ap());
        mapStory.put("Label", historique.getLibelle_av());
        mapStory.put("date_eff", historique.getDate_eff());
        List<ProcessStep1> historiqueList = new ArrayList<>();


        return mapStory.toString();
      }
    });
    return writerStep1;

  }

  @Bean
  public ItemWriter<City> writerStep2() {
    JsonFileItemWriter<City> writerStep2 = new JsonFileItemWriter<>(new FileSystemResource("src/main/java/output/testStep.json"),
        new JsonObjectMarshaller<City>() {
          @Override
          public String marshal(City object) {
            try {
              ObjectMapper mapper = new ObjectMapper();
              mapper.enable(SerializationFeature.INDENT_OUTPUT);
              return mapper.writeValueAsString(object);
            }
            catch(JsonProcessingException e) {
              e.printStackTrace();
              return null;
            }
          }
        }) {

      @Override
      public void open(ExecutionContext executionContext)
          throws ItemStreamException {
        super.open(executionContext);
        try {
          getOutputState().write("[{City: }");
        }
        catch(IOException e) {
          throw new RuntimeException(e);
        }
      }
    };
    writerStep2.setResource(new FileSystemResource("src/main/java/output/testStep.json"));
    writerStep2.setAppendAllowed(true);
    writerStep2.setJsonObjectMarshaller(new JsonObjectMarshaller<City>() {
      @Override
      public String marshal(City city) {
        Map<String, String> mapCity = new LinkedHashMap<>();
        mapCity.put("parent", city.getCom());
        mapCity.put("typeMod", city.getTypecom());
        mapCity.put("typeModNumber", city.getLibelle());
        mapCity.put("oldLabel", city.getComparent());
        mapCity.put("Label", city.getCountry());
        System.out.println(mapCity.toString() + "test");
        return mapCity.toString();

      }
    });
    return writerStep2;
  }

// 现在是我的 ProcessStep1

@Component
public class ProcessStep1 implements ItemProcessor<Historique, Historique>, StepExecutionListener {

  private DataHolder dataHolder;
  private ChunkContext chunkContext;

  @Override
  public Historique process(Historique historique) throws Exception {

    ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();

    List<Historique> historiqueList = new ArrayList<>();
    executionContext.put("histories", historiqueList);
    return historique;
  }

  @Override
  public void beforeStep(StepExecution stepExecution) {
    StepExecutionListener.super.beforeStep(stepExecution);
    dataHolder = new DataHolder();
    stepExecution.getJobExecution().getExecutionContext().put("histories", dataHolder);

  }
  @Override
  public ExitStatus afterStep(StepExecution stepExecution) {
    DataHolder dataHolder = (DataHolder) stepExecution.getJobExecution().getExecutionContext().get("histories");
    return StepExecutionListener.super.afterStep(stepExecution);
  }
}

//和我的流程Step2

@Component
public class ProcessStep2 implements ItemProcessor<City, City>, StepExecutionListener {

  private DataHolder dataHolder;
  private ChunkContext chunkContext;

  @Override
  public City process(City city) throws Exception {
    ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
    List<Historique> historiqueList = (List<Historique>) executionContext.get("histories");
    if(historiqueList == null) {
      historiqueList = new ArrayList<>();
    }
    city.setHistoriqueList(historiqueList);

return city;
  }

  @Override
  public void beforeStep(StepExecution stepExecution) {
    StepExecutionListener.super.beforeStep(stepExecution);
    DataHolder dataHolder = (DataHolder) stepExecution.getJobExecution().getExecutionContext().get("histories");
    stepExecution.getJobExecution().getExecutionContext().put("cities", dataHolder);
  }
  @Override
  public ExitStatus afterStep(StepExecution stepExecution) {
    return StepExecutionListener.super.afterStep(stepExecution);}

  }

//我尝试将数据保存在 DataHolder 类的内存中,以便能够传递它们,因为我不使用数据库

@Component
public class DataHolder implements Serializable {
  private Map<String, Object> dataMap;

  public DataHolder() {
    dataMap = new ConcurrentHashMap<>();
  }

  public Map<String, Object> getDataMap() {
    return dataMap;
  }

  public void setDataMap(Map<String, Object> dataMap) {
    this.dataMap = dataMap;
  }
java spring-batch executioncontext
1个回答
© www.soinside.com 2019 - 2024. All rights reserved.