您好,我有一个集成流程,该流程将文件逐行拆分,将每一行转换为一个POJO,然后通过JDBC出站网关将该POJO插入db。
我希望能够在文件处理完成后发送一封电子邮件。我目前正在jdbcOutboundGateway之后发送到smtpFlow通道,但是这在每次数据库插入后都会发送一封电子邮件。
这是我当前的流量DSL
IntegrationFlow ftpFlow() {
return IntegrationFlows.from(
ftpSource(), spec -> spec.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
.split(splitFile())
.transform(this::transformToIndividualScore)
.handle(jdbcOutboundGateway(null))
.channel("smtpFlow")
.get();
[在jdbcOutboundGateway
中处理完所有文件后,如何使此流程仅发送一封电子邮件?
这是我的splitFile()
方法
@Bean
FileSplitter splitFile() {
FileSplitter fs = new FileSplitter(true, false);
fs.setFirstLineAsHeader("IndividualScore");
return fs;
这是我的transformToIndividualScore
方法
@Transformer
private IndividualScore transformToIndividualScore(String payload) {
String[] values = payload.split(",");
IndividualScore is = new IndividualScore();
is.setScorecardDate(values[0]);
is.setVnSpId(values[1]);
is.setPrimaryCat(values[2]);
is.setSecondaryCat(values[3]);
is.setScore(Integer.parseInt(values[4]));
is.setActual(values[5]);
return is;
}
在句柄后添加.aggregate()
以将拆分结果组合回单个消息。
是解决我的问题的一种。
现在在我的false
上将迭代器标记为FileSplitter
,允许排序头。
更新后的splitFile()
在下面
@Bean
FileSplitter splitFile() {
FileSplitter fs = new FileSplitter(false, false);
fs.setFirstLineAsHeader("IndividualScore");
fs.setApplySequence(true);
return fs;
}
我的直觉告诉我,默认的.aggregate()
发布策略必须是消息标题sequenceSize
==聚集的消息列表。
[在将FileSplitter
设置为iterator
的情况下创建true
时,sequenceSize
设置为0
,这将永远无法满足默认.aggregate()
的释放策略
但是,这使FileSplitter
使用List
将文件的所有行存储在内存中。聚合器还将另一个ArrayList
行存储在内存中。
是否有更好的解决方案来创建可处理END
FileMarker
的自定义聚合器,以允许使用迭代器拆分文件?
在@ArtemBilan的帮助下