我需要处理从两个不同文件读取的数据并需要存储在Mongo DB中。由于作业下的一个步骤不能有多个读取器,所以我想到了有两个不同的读取器,但是我们如何合并从两个不同步骤从两个不同文件读取的数据以及在哪一步中我们需要实现处理器和写入器。
我尝试实现一个分为两个不同步骤的流程,每个步骤处理从两个文件中读取记录。因为它们是完全不同的文件,其标头不同。
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(splitFlow())
.next(step4())
.build() //builds FlowJobBuilder instance
.build(); //builds Job instance
}
@Bean
public Flow splitFlow() {
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1()) // where step1 handles the reading of one flatFile.
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3()) // where step3 handles the reading of another flatFile.
.build();
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
现在,对于第一个文件中的每条记录,我需要在第二个文件中查找 id 值,然后存储在数据库中。
file1.txt
--------------
Id| First Name| Last Name
file2.txt
-----------
id| Dept
file2.txt
--------------
id| First Name| Last Name| Dept
如何将从两个阅读器读取的数据合并到一个处理器下进行处理? 有人对在这种情况下应如何配置作业有任何建议吗?
不太确定如何使用 spring-batch 来做到这一点,但是使用嵌入式数据库(例如 H2)来做到这一点非常简单。假设将两个文件从
txt
转换为 csv
不是问题,您只需将这两个文件加载到两个表中并通过 ID
将它们连接起来
CREATE table_1 TEST AS SELECT * FROM CSVREAD('file1.csv');
CREATE table_2 TEST AS SELECT * FROM CSVREAD('file2.csv');
SELECT t1.first_name, t1.last_name, t2.dept
FROM table_1 t1
JOIN table_2 t2
ON t1.id = t2.id;
然后根据您的要求,您可以将它们写回文件或执行任何您需要的操作。