Camel 3.21 & String Boot 2.7.15
public void configure() throws Exception {
AggregationStrategy arrayListStrategy = AggregationStrategies.flexible(Document.class)
.accumulateInCollection(ArrayList.class)
.pick(body());
BiFunction<String, Integer, List<Document>> generate = (type, size) ->
IntStream.range(1, size +1)
.mapToObj(i -> new Document("type", type).append("id", "id" + i))
.toList();
from("timer://timer?repeatCount=1")
.to("direct:route-1");
from("direct:route-1")
.process(exchange -> {
var documents = generate.apply("type-1", 300);
exchange.getMessage().setBody(documents);
})
.split(body()).streaming().aggregate(constant(true), arrayListStrategy).completionTimeout(3000)
.enrich("direct:route-2", new ConcatAllAggregationStrategy())
.log(LoggingLevel.DEBUG, LOG_NAME,"route-1 : ${body.size()}");
from("direct:route-2")
.process(exchange -> {
var documents = generate.apply("type-2", 500);
exchange.getMessage().setBody(documents);
})
.split(body()).streaming().aggregate(constant(true), arrayListStrategy).completionTimeout(3000)
.process(exchange -> {
var body = (List<Document>)exchange.getIn().getBody();
LOGGER.debug("route-2.processor : {}", body.size());
})
.log(LoggingLevel.DEBUG, LOG_NAME,"route-2 : ${body.size()}");
}
private class ConcatAllAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (newExchange == null) {
return oldExchange;
}
var oldBody = (List<Document>)oldExchange.getIn().getBody();
var newBody = (List<Document>)newExchange.getIn().getBody();
var all = Stream.concat(oldBody.stream(), newBody.stream()).toList();
oldExchange.getIn().setBody(all);
return oldExchange;
}
}
程序输出如下:
DEBUG [][Camel (camel-1) thread #4 - Aggregator] o.a.c.s.CamelLogger: route-1 : 800
DEBUG [][Camel (camel-1) thread #5 - Aggregator] o.e.c.AnotherBuilder: route-2.processor : 500
DEBUG [][Camel (camel-1) thread #5 - Aggregator] o.a.c.s.CamelLogger: route-2 : 500
为什么route-2的日志出现在route-1的日志之后? 我认为route-2应该在调用enrich()聚合策略之前完全执行。
如果我删除线
.split(body()).streaming().aggregate(constant(true), arrayListStrategy).completionTimeout(3000)
,它的行为符合预期,我得到:
DEBUG [][Camel (camel-1) thread #1 - timer://timer] o.e.c.AnotherBuilder: route-2.processor : 500
DEBUG [][Camel (camel-1) thread #1 - timer://timer] o.a.c.s.CamelLogger: route-2 : 500
DEBUG [][Camel (camel-1) thread #1 - timer://timer] o.a.c.s.CamelLogger: route-1 : 800
如何在使用
split(body()).streaming()
和 .aggregate()
的同时实现此结果?
对于那些感兴趣的人,以下代码可以按预期工作。我放弃使用丰富,而是实现了新的聚合策略以及收件人列表。
from("timer://timer?repeatCount=1")
.recipientList(simple("seda:route-1,seda:route-2"))
.end();
from("seda:route-1")
.process(exchange -> {
var documents = generate.apply("type-1", 300);
exchange.getMessage().setBody(documents);
})
.split(body()).streaming().aggregate(constant(true), arrayListStrategy).completionTimeout(3000)
.to("seda:aggregate");
from("seda:route-2")
.process(exchange -> {
var documents = generate.apply("type-2", 500);
exchange.getMessage().setBody(documents);
})
.split(body()).streaming().aggregate(constant(true), arrayListStrategy).completionTimeout(3000)
.process(exchange -> {
var body = (List<Document>)exchange.getIn().getBody();
LOGGER.debug("route-2.processor : {}", body.size());
})
.log(LoggingLevel.DEBUG, LOG_NAME,"route-2 : ${body.size()}")
.to("seda:aggregate");
from("seda:aggregate")
.aggregate(constant(true), new ConcatAllAggregationStrategy()).completionSize(2)
.log(LoggingLevel.DEBUG, LOG_NAME," => 3 - body.size : ${body.size}");
private class ConcatAllAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
var newBody = (List<Document>)newExchange.getIn().getBody();
List<Document> oldBody = null;
if (oldExchange == null) {
oldBody = newBody;
newExchange.getIn().setBody(oldBody);
return newExchange;
} else {
oldBody = (List<Document>)oldExchange.getIn().getBody();
}
var all = Stream.concat(oldBody.stream(), newBody.stream()).toList();
oldExchange.getIn().setBody(all);
return oldExchange;
}
}
程序输出如下:
DEBUG [][Camel (camel-1) thread #10 - Aggregator] o.e.c.AnotherBuilder: route-2.processor : 500
DEBUG [][Camel (camel-1) thread #10 - Aggregator] o.a.c.s.CamelLogger: route-2 : 500
DEBUG [][Camel (camel-1) thread #11 - Aggregator] o.a.c.s.CamelLogger: => 3 - body.size : 800