我有一个 Spring Batch 项目,可以通过两个不同的步骤读取两个 txt 文件(Cyties 和 Histories)。我使用 ItemsReaders,一切都很好,我什至可以将它们一起写入 json 格式的单个新文件中(这是期望的目标)。但在编写它们之前,我需要在项目处理器中对它们进行比较,以便出现匹配时。每个城市都有它的历史。我正在尝试设置 jobExecutionContext 以便从 process1 中检索我的历史列表,并将它们与 process2 中的城市列表进行比较。对我来说这是不可能实现的。你能帮我找出问题所在吗?
@Configuration
public class BatchConfiguration {
@Bean
DataSource dataSource() {
return new EmbeddedDatabaseBuilder().setType(EmbeddedDatabaseType.H2).addScript("/org/springframework/batch/core/schema-h2.sql").generateUniqueName(
true).build();
}
@Bean
PlatformTransactionManager transactionManager(DataSource ds) {
return new JdbcTransactionManager(ds);
}
@Bean
public Job runjob(JobRepository jobRepository, PlatformTransactionManager txManager)
throws Exception {
return new JobBuilder("runJob", jobRepository)
.start(step1(jobRepository, txManager))
.next(step2(jobRepository, txManager))
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager txManager)
throws Exception {
return new StepBuilder("step1")
.repository(jobRepository)
.<Historique,Historique>chunk(100)
.reader(readerStep1())
.processor(processStep1())
.writer(writerStep1())
.listener(promotionListener())
.transactionManager(txManager)
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[]{"histories"});
return listener;
}
@Bean
public Step step2(JobRepository jobRepository, PlatformTransactionManager txManager)
throws Exception {
return new StepBuilder("step1")
.repository(jobRepository)
.<City,City>chunk(100)
.reader(readerStep2())
.processor(processStep2())
.writer(writerStep2())
.transactionManager(txManager)
.build();
}
//
// @Bean
// public Step stepCity(JobRepository jobRepository, PlatformTransactionManager txManager)
// throws Exception {
// return new StepBuilder("stepCity")
// .repository(jobRepository)
// .<City, City>chunk(100)
// .reader(readerCity())
// .processor(processorCity())
// .writer(writerCity())
// .transactionManager(txManager).build();
// }
@Bean
public ItemReader<Historique> readerStep1()
throws Exception {
FlatFileItemReader<Historique> readerStep1 = new FlatFileItemReader<Historique>();
readerStep1.setLineMapper(new DefaultLineMapper() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[]{"mod", "date_eff", "typecom_av", "com_av", "tncc_av", "ncc_av", "nccenr_av", "libelle_av", "typecom_ap", "com_ap", "tncc_ap", "ncc_ap",
"nccenr_ap", "libelle_ap"});
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Historique>() {{
setTargetType(Historique.class);
}});
}});
readerStep1.setResource(new ClassPathResource("documents/HistoriqueCities.txt"));
return readerStep1;
}
@Bean
public ItemReader<City> readerStep2()
throws Exception {
FlatFileItemReader<City> readerStep2 = new FlatFileItemReader<City>();
readerStep2.setLineMapper(new DefaultLineMapper() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[]{"typecom", "com", "reg", "dep", "arr", "tncc", "ncc", "nccenr", "libelle", "can", "comparent"});
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<City>() {{
setTargetType(City.class);
}});
}});
readerStep2.setResource(new ClassPathResource("documents/Cities.txt"));
return readerStep2;
}
@Bean
public ProcessStep1 processStep1() {
return new ProcessStep1();
}
@Bean
public ProcessStep2 processStep2() {
return new ProcessStep2() {
};
}
@Bean
public ItemWriter<Historique> writerStep1() {
JsonFileItemWriter<Historique> writerStep1 = new JsonFileItemWriter<>(new FileSystemResource("src/main/java/output/testStep.json"),
new JsonObjectMarshaller<Historique>() {
@Override
public String marshal(Historique object) {
try {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(SerializationFeature.INDENT_OUTPUT, true);
return mapper.writeValueAsString(object);
}
catch(JsonProcessingException e) {
e.printStackTrace();
return null;
}
}
}) {
@Override
public void open(ExecutionContext executionContext)
throws ItemStreamException {
super.open(executionContext);
try {
getOutputState().write("[{Historique: }");
}
catch(IOException e) {
throw new RuntimeException(e);
}
}
};
writerStep1.setResource(new FileSystemResource("src/main/java/output/testStep.json"));
writerStep1.setJsonObjectMarshaller(new JsonObjectMarshaller<Historique>() {
@Override
public String marshal(Historique historique) {
Map<String, String> mapStory = new LinkedHashMap<>();
mapStory.put("parent", historique.getCom_av());
mapStory.put("typeMod", historique.getTypecom_av());
mapStory.put("typeModNumber", historique.getMod());
mapStory.put("oldLabel", historique.getLibelle_ap());
mapStory.put("Label", historique.getLibelle_av());
mapStory.put("date_eff", historique.getDate_eff());
List<ProcessStep1> historiqueList = new ArrayList<>();
return mapStory.toString();
}
});
return writerStep1;
}
@Bean
public ItemWriter<City> writerStep2() {
JsonFileItemWriter<City> writerStep2 = new JsonFileItemWriter<>(new FileSystemResource("src/main/java/output/testStep.json"),
new JsonObjectMarshaller<City>() {
@Override
public String marshal(City object) {
try {
ObjectMapper mapper = new ObjectMapper();
mapper.enable(SerializationFeature.INDENT_OUTPUT);
return mapper.writeValueAsString(object);
}
catch(JsonProcessingException e) {
e.printStackTrace();
return null;
}
}
}) {
@Override
public void open(ExecutionContext executionContext)
throws ItemStreamException {
super.open(executionContext);
try {
getOutputState().write("[{City: }");
}
catch(IOException e) {
throw new RuntimeException(e);
}
}
};
writerStep2.setResource(new FileSystemResource("src/main/java/output/testStep.json"));
writerStep2.setAppendAllowed(true);
writerStep2.setJsonObjectMarshaller(new JsonObjectMarshaller<City>() {
@Override
public String marshal(City city) {
Map<String, String> mapCity = new LinkedHashMap<>();
mapCity.put("parent", city.getCom());
mapCity.put("typeMod", city.getTypecom());
mapCity.put("typeModNumber", city.getLibelle());
mapCity.put("oldLabel", city.getComparent());
mapCity.put("Label", city.getCountry());
System.out.println(mapCity.toString() + "test");
return mapCity.toString();
}
});
return writerStep2;
}
@Component
public class ProcessStep1 implements ItemProcessor<Historique, Historique>, StepExecutionListener {
private DataHolder dataHolder;
private ChunkContext chunkContext;
@Override
public Historique process(Historique historique) throws Exception {
ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
List<Historique> historiqueList = new ArrayList<>();
executionContext.put("histories", historiqueList);
return historique;
}
@Override
public void beforeStep(StepExecution stepExecution) {
StepExecutionListener.super.beforeStep(stepExecution);
dataHolder = new DataHolder();
stepExecution.getJobExecution().getExecutionContext().put("histories", dataHolder);
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
DataHolder dataHolder = (DataHolder) stepExecution.getJobExecution().getExecutionContext().get("histories");
return StepExecutionListener.super.afterStep(stepExecution);
}
}
//和我的流程Step2
@Component
public class ProcessStep2 implements ItemProcessor<City, City>, StepExecutionListener {
private DataHolder dataHolder;
private ChunkContext chunkContext;
@Override
public City process(City city) throws Exception {
ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
List<Historique> historiqueList = (List<Historique>) executionContext.get("histories");
if(historiqueList == null) {
historiqueList = new ArrayList<>();
}
city.setHistoriqueList(historiqueList);
return city;
}
@Override
public void beforeStep(StepExecution stepExecution) {
StepExecutionListener.super.beforeStep(stepExecution);
DataHolder dataHolder = (DataHolder) stepExecution.getJobExecution().getExecutionContext().get("histories");
stepExecution.getJobExecution().getExecutionContext().put("cities", dataHolder);
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return StepExecutionListener.super.afterStep(stepExecution);}
}
//我尝试将数据保存在 DataHolder 类的内存中,以便能够传递它们,因为我不使用数据库
@Component
public class DataHolder implements Serializable {
private Map<String, Object> dataMap;
public DataHolder() {
dataMap = new ConcurrentHashMap<>();
}
public Map<String, Object> getDataMap() {
return dataMap;
}
public void setDataMap(Map<String, Object> dataMap) {
this.dataMap = dataMap;
}