Apache Camel:如何使用Aggregation来获得实际的老交换?

问题描述 投票:0回答:1

所有的人。

我目前的用例是,我需要调用路由中的一个端点,但在调用后继续使用原来的消息。我找到的方法是使用 multicast() 然后是 "聚合策略"。AggregationStrategies.useOriginal(),即 UseOriginalAggregationStrategy() 要说明的是,我并没有将消息发送到多个端点,只是使用了这个方法。multicast() 将数据 "重置 "到之前的状态。

现在我遇到的问题是,我必须从内部传播一些信息。multicast() 到外面的路由上,并在以后使用它(如下例)。它试图用更新的 UseOriginalAggregationStrategy()在这里,我只 "重置 "了 Message但留下 Exchange 原样,这样我就可以更新属性,并且在更新后仍然可以拥有它们。multicast()

AggregationStrategy:

public class UseOriginalMessageAggregationStrategy implements AggregationStrategy {

  @Override
  public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
    if (oldExchange != null && oldExchange.getIn() != null) {
      newExchange.setIn(oldExchange.getIn());
    }
    return newExchange;
  }
}

UnitTest:

@Test
public void test() throws Exception {
  camelContext.addRoutes(new RouteBuilder() {
    @Override
    public void configure() throws Exception {
      from("direct:test")
          .multicast(new UseOriginalMessageAggregationStrategy())
            .process(exchange -> { // This could also be a 'to()', it doesn't make a difference
              exchange.setProperty("c", 1);
              exchange.getIn().setHeader("d", 1);
            })
          .end()
          .log("Property a: ${exchangeProperty.a}")
          .log("Header b: ${header.b}")
          .log("Property c: ${exchangeProperty.c}")
          .log("Header d: ${header.d}");
    }
  });

  Exchange exchange = new ExchangeBuilder(camelContext)
      .withProperty("a", 1)
      .withHeader("b", 1)
      .build();
  camelContext.createProducerTemplate().send("direct:test", exchange);
}

现在我期待的是这个日志:

属性a: 1头b: 1属性c: 1个头d:

但实际发生的情况是:

属性a: 1个标题b: 1个属性c: 1个标题d: 1 标题d: 1

请注意,"Header d "仍然被设置了,尽管它属于在 multicast(),现在应该已经被覆盖了。

经过调试,我注意到,发生这种情况是因为AggregationStrategy中的'oldExchange'实际上是空的,这时日志才有意义,因为被记录的正是'newExchange'。

现在我感到非常困惑。尽管'oldExchange'是空的,这意味着我们无法访问在这之前的Exchange。multicast()జజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజ UseOriginalAggregationStrategy() 仍然能够得到确切的交换。

这里用上面的UnitTest举个例子。

@Test
public void test() throws Exception {
  camelContext.addRoutes(new RouteBuilder() {
    @Override
    public void configure() throws Exception {

      from("direct:test")
          .multicast(AggregationStrategies.useOriginal()) // <- Using the camel AggregationStrategy now
            .process(exchange -> {
              exchange.setProperty("c", 1);
              exchange.getIn().setHeader("d", 1);
            })
          .end()
          .log("Property a: ${exchangeProperty.a}")
          .log("Header b: ${header.b}")
          .log("Property c: ${exchangeProperty.c}")
          .log("Header d: ${header.d}");
    }
  });

  Exchange exchange = new ExchangeBuilder(camelContext)
      .withProperty("a", 1)
      .withHeader("b", 1)
      .build();
  camelContext.createProducerTemplate().send("direct:test", exchange);
}

这个日志(如你所料):

属性a: 1 头b: 1 属性c: Header d:

显然,它是通过返回null来实现的。因此,当我在 AggregationStrategy 中返回 null 时,我得到了实际的初始 Exchange,但没有办法在 AggregationStrategy 中直接访问该 Exchange?这在我看来有点奇怪,所以我猜测我漏掉了什么。

最诚挚的问候

TL;DR:我怎样才能将Exchange发送到另一个端点,并将消息重置为之前的样子,同时保留ExchangeProperties,在其他端点中设置。

java apache-camel
1个回答
0
投票

你在这里的目标是用额外的属性来丰富你的原始payload.Enterprise Integration Patterns明智的enricher模式比multi cast更适合。https:/camel.apache.orgcomponentslatesteipscontent-enricher.html。 以及该页面中的ExampleAggregationStrategy类。

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {

        Object resourceResponse = resource.getIn().getBody();
        // retrieve stuff you want from resource exchange and add them as properties to to original
        return original;
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.