Apache Camel - 如何在 split(body()).streaming().aggregate 之后使用丰富()?

问题描述 投票:0回答:1

Camel 3.21 & String Boot 2.7.15

public void configure() throws Exception {

    AggregationStrategy arrayListStrategy = AggregationStrategies.flexible(Document.class)
            .accumulateInCollection(ArrayList.class)
            .pick(body());

    BiFunction<String, Integer, List<Document>> generate = (type, size) ->
            IntStream.range(1, size +1)
            .mapToObj(i -> new Document("type", type).append("id", "id" + i))
            .toList();

    from("timer://timer?repeatCount=1")
            .to("direct:route-1");

    from("direct:route-1")
            .process(exchange -> {
                var documents = generate.apply("type-1", 300);
                exchange.getMessage().setBody(documents);
            })
            .split(body()).streaming().aggregate(constant(true), arrayListStrategy).completionTimeout(3000)
            .enrich("direct:route-2", new ConcatAllAggregationStrategy())
            .log(LoggingLevel.DEBUG, LOG_NAME,"route-1 : ${body.size()}");

    from("direct:route-2")
            .process(exchange -> {
                var documents = generate.apply("type-2", 500);
                exchange.getMessage().setBody(documents);
            })
            .split(body()).streaming().aggregate(constant(true), arrayListStrategy).completionTimeout(3000)
            .process(exchange -> {
                var body = (List<Document>)exchange.getIn().getBody();
                LOGGER.debug("route-2.processor : {}", body.size());
            })
            .log(LoggingLevel.DEBUG, LOG_NAME,"route-2 : ${body.size()}");
}


private class ConcatAllAggregationStrategy implements AggregationStrategy {

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (newExchange == null) {
            return oldExchange;
        }
        var oldBody = (List<Document>)oldExchange.getIn().getBody();
        var newBody = (List<Document>)newExchange.getIn().getBody();
        var all = Stream.concat(oldBody.stream(), newBody.stream()).toList();
        oldExchange.getIn().setBody(all);
        return oldExchange;
    }
}

程序输出如下:

DEBUG [][Camel (camel-1) thread #4 - Aggregator] o.a.c.s.CamelLogger: route-1 : 800
DEBUG [][Camel (camel-1) thread #5 - Aggregator] o.e.c.AnotherBuilder: route-2.processor : 500
DEBUG [][Camel (camel-1) thread #5 - Aggregator] o.a.c.s.CamelLogger: route-2 : 500

为什么route-2的日志出现在route-1的日志之后? 我认为route-2应该在调用enrich()聚合策略之前完全执行。

如果我删除线

.split(body()).streaming().aggregate(constant(true), arrayListStrategy).completionTimeout(3000)
,它的行为符合预期,我得到:

DEBUG [][Camel (camel-1) thread #1 - timer://timer] o.e.c.AnotherBuilder: route-2.processor : 500
DEBUG [][Camel (camel-1) thread #1 - timer://timer] o.a.c.s.CamelLogger: route-2 : 500
DEBUG [][Camel (camel-1) thread #1 - timer://timer] o.a.c.s.CamelLogger: route-1 : 800

如何在使用

split(body()).streaming()
.aggregate()
的同时实现此结果?

java multithreading apache-camel
1个回答
0
投票

对于那些感兴趣的人,以下代码可以按预期工作。我放弃使用丰富,而是实现了新的聚合策略以及收件人列表。

from("timer://timer?repeatCount=1")
    .recipientList(simple("seda:route-1,seda:route-2"))
    .end();

from("seda:route-1")
        .process(exchange -> {
            var documents = generate.apply("type-1", 300);
            exchange.getMessage().setBody(documents);
        })
        .split(body()).streaming().aggregate(constant(true), arrayListStrategy).completionTimeout(3000)
        .to("seda:aggregate");

from("seda:route-2")
        .process(exchange -> {
            var documents = generate.apply("type-2", 500);
            exchange.getMessage().setBody(documents);
        })
        .split(body()).streaming().aggregate(constant(true), arrayListStrategy).completionTimeout(3000)
        .process(exchange -> {
            var body = (List<Document>)exchange.getIn().getBody();
            LOGGER.debug("route-2.processor : {}", body.size());
        })
        .log(LoggingLevel.DEBUG, LOG_NAME,"route-2 : ${body.size()}")
        .to("seda:aggregate");

from("seda:aggregate")
        .aggregate(constant(true), new ConcatAllAggregationStrategy()).completionSize(2)
        .log(LoggingLevel.DEBUG, LOG_NAME," => 3 - body.size : ${body.size}");
        
private class ConcatAllAggregationStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        var newBody = (List<Document>)newExchange.getIn().getBody();
        List<Document> oldBody = null;
        if (oldExchange == null) {
            oldBody = newBody;
            newExchange.getIn().setBody(oldBody);
            return newExchange;
        } else {
            oldBody = (List<Document>)oldExchange.getIn().getBody();
        }
        var all = Stream.concat(oldBody.stream(), newBody.stream()).toList();
        oldExchange.getIn().setBody(all);
        return oldExchange;
    }
}
        

程序输出如下:

DEBUG [][Camel (camel-1) thread #10 - Aggregator] o.e.c.AnotherBuilder: route-2.processor : 500
DEBUG [][Camel (camel-1) thread #10 - Aggregator] o.a.c.s.CamelLogger: route-2 : 500
DEBUG [][Camel (camel-1) thread #11 - Aggregator] o.a.c.s.CamelLogger:  => 3 - body.size : 800

© www.soinside.com 2019 - 2024. All rights reserved.