我正在使用apache camel和spring boot来实现两个表之间的集成流程。源表包含1000多条记录。我想要做的是在对源数据进行一些修改之后,这些数据应该插入到同一数据库中的另一个表中。我坚持使用数据插入阶段。
<camelContext id="Integrator" xmlns="http://camel.apache.org/schema/spring">
<route id="hello">
<from id="timer" uri="timer:test?period={{timer.period}}"/>
<setBody id="query">
<constant>SELECT * FROM abc WHERE code = 'MDV1'</constant>
</setBody>
<log id="log_1" message="log msg"/>
<to id="jdbc_con" uri="jdbc:dataSource"/>
<process id="changebody" ref="editPayload"/>
<log id="log_2" message="process row ${body}"/>
</route>
</camelContext>
更新:
这不是我想要的确切答案。但是这个流可以将记录从源表插入到目标。在此解决方案中,记录将逐个插入到目标表中。我想在最后阶段插入批量而不是逐个插入。
<camelContext id="Integrator"
xmlns="http://camel.apache.org/schema/spring">
<route id="data_transfer">
<from id="timer" uri="timer:abcStaging?period={{timer.period}}" />
<setBody id="select_query">
<constant>select * from abc</constant>
</setBody>
<to id="jdbc_con_select" uri="jdbc:dataSource" />
<split>
<simple>${body}</simple>
<process id="change_body" ref="editPayload" />
<to id="jdbc_con_insert" uri="sql:{{sql.abcStaging}}" />
<log id="log_1" message="Inserted abcStaging" />
</split>
</route>
</camelContext>
属性文件:
sql.abcStaging =insert into abcStaging (id, rate) values (:#id, :#rate)
editPayload Bean:
public class ChangePayload implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
LinkedHashMap linkedHashMap = (LinkedHashMap) exchange.getIn().getBody();
Map<String, Object> staging = new HashMap<>();
/* data changing logics */
staging.put("id", "id");
staging.put("rate", "rate");
exchange.getOut().setBody(staging);
}
}
如果您的主要要求是执行单个批量查询,则可能必须在聚合器中自行构建查询。
如果您的DBMS支持如下语法:
INSERT INTO T1 (F1, F2) Values (a1, b1), (a2,b2)
然后聚合器可以构建那个大字符串。 (如果您有(或者说)10,000行或100,000行,这可能是不可行的,因为它可能超过语句大小限制。)另一个缺点是需要构建知道数据类型的值子句...不确定是否一个人可以参数化这样的东西。