所有的人。
我目前的用例是,我需要调用路由中的一个端点,但在调用后继续使用原来的消息。我找到的方法是使用 multicast()
然后是 "聚合策略"。AggregationStrategies.useOriginal()
,即 UseOriginalAggregationStrategy()
要说明的是,我并没有将消息发送到多个端点,只是使用了这个方法。multicast()
将数据 "重置 "到之前的状态。
现在我遇到的问题是,我必须从内部传播一些信息。multicast()
到外面的路由上,并在以后使用它(如下例)。它试图用更新的 UseOriginalAggregationStrategy()
在这里,我只 "重置 "了 Message
但留下 Exchange
原样,这样我就可以更新属性,并且在更新后仍然可以拥有它们。multicast()
AggregationStrategy:
public class UseOriginalMessageAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange != null && oldExchange.getIn() != null) {
newExchange.setIn(oldExchange.getIn());
}
return newExchange;
}
}
UnitTest:
@Test
public void test() throws Exception {
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:test")
.multicast(new UseOriginalMessageAggregationStrategy())
.process(exchange -> { // This could also be a 'to()', it doesn't make a difference
exchange.setProperty("c", 1);
exchange.getIn().setHeader("d", 1);
})
.end()
.log("Property a: ${exchangeProperty.a}")
.log("Header b: ${header.b}")
.log("Property c: ${exchangeProperty.c}")
.log("Header d: ${header.d}");
}
});
Exchange exchange = new ExchangeBuilder(camelContext)
.withProperty("a", 1)
.withHeader("b", 1)
.build();
camelContext.createProducerTemplate().send("direct:test", exchange);
}
现在我期待的是这个日志:
属性a: 1头b: 1属性c: 1个头d:
但实际发生的情况是:
属性a: 1个标题b: 1个属性c: 1个标题d: 1 标题d: 1
请注意,"Header d "仍然被设置了,尽管它属于在 multicast()
,现在应该已经被覆盖了。
经过调试,我注意到,发生这种情况是因为AggregationStrategy中的'oldExchange'实际上是空的,这时日志才有意义,因为被记录的正是'newExchange'。
现在我感到非常困惑。尽管'oldExchange'是空的,这意味着我们无法访问在这之前的Exchange。multicast()
జజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజ UseOriginalAggregationStrategy()
仍然能够得到确切的交换。
这里用上面的UnitTest举个例子。
@Test
public void test() throws Exception {
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:test")
.multicast(AggregationStrategies.useOriginal()) // <- Using the camel AggregationStrategy now
.process(exchange -> {
exchange.setProperty("c", 1);
exchange.getIn().setHeader("d", 1);
})
.end()
.log("Property a: ${exchangeProperty.a}")
.log("Header b: ${header.b}")
.log("Property c: ${exchangeProperty.c}")
.log("Header d: ${header.d}");
}
});
Exchange exchange = new ExchangeBuilder(camelContext)
.withProperty("a", 1)
.withHeader("b", 1)
.build();
camelContext.createProducerTemplate().send("direct:test", exchange);
}
这个日志(如你所料):
属性a: 1 头b: 1 属性c: Header d:
显然,它是通过返回null来实现的。因此,当我在 AggregationStrategy 中返回 null 时,我得到了实际的初始 Exchange,但没有办法在 AggregationStrategy 中直接访问该 Exchange?这在我看来有点奇怪,所以我猜测我漏掉了什么。
最诚挚的问候
TL;DR:我怎样才能将Exchange发送到另一个端点,并将消息重置为之前的样子,同时保留ExchangeProperties,在其他端点中设置。
你在这里的目标是用额外的属性来丰富你的原始payload.Enterprise Integration Patterns明智的enricher模式比multi cast更适合。https:/camel.apache.orgcomponentslatesteipscontent-enricher.html。 以及该页面中的ExampleAggregationStrategy类。
public class ExampleAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange original, Exchange resource) {
Object resourceResponse = resource.getIn().getBody();
// retrieve stuff you want from resource exchange and add them as properties to to original
return original;
}
}