IntegrationFlowDefinition.aggregation不起作用。也许CorrelationStrategy失效了?

问题描述 投票:1回答:1

错误信息。原因是:java.lang.IllegalStateException: Null correlation not allowed. 也许是CorrelationStrategy失败了?

我的实现。

    @Bean
    public IntegrationFlow start() {
        return IntegrationFlows
                .from("getOrders")
                .split()
                .publishSubscribeChannel(c -> c.subscribe(s -> s.channel(q -> q.queue(1))
                        .<Order, Message<?>>transform(p -> MessageBuilder.withPayload(new Item(p.getItems())).setHeader(ORDERID, p.getOrderId()).build())
                        .split(Item.class, Item::getItems)
                        .transform() // let's assume, an object created for each item, let's say ItemProperty to the object.
                                     //  Transform returns message; MessageBuilder.withPayload(createItemProperty(getItemName, getItemId)).build();
                        .aggregate() // so, here aggregate method needs to aggregate ItemProperties.
                        .handle() // handler gets List<ItemProperty> as an input.
      ))
      .get();
    }

两个分路器都能正常工作。我也测试过第二台分路器后的变压器,工作正常。但是,到了汇总的时候,却出现了故障。我这里缺什么?

spring-integration spring-integration-dsl
1个回答
1
投票

你忽略了这样一个事实 transformer 是这种类型的端点,它可以原封不动地处理整个消息。如果你自己创建了一个消息,它不会修改它。MessageBuilder.withPayload(createItemProperty(getItemName, getItemId)).build(); 你只是在分割器之后错过了重要的序列细节头。因此,之后的聚合器不知道该怎么处理你的消息,因为你为默认的相关策略配置了它,但你没有在消息中提供各自的头信息。

从技术上讲,我看不出有什么理由在那边手动创建一个消息:简单的 return createItemProperty(getItemName, getItemId); 对你来说应该足够了。而框架会代你去创建消息,并复制各自的请求消息头。

如果你真的还认为你需要自己在那个变换中创建一个消息,那么你需要考虑到的是 copyHeaders() 关于 MessageBuilder 从请求报文中携带所需的序列细节头。

© www.soinside.com 2019 - 2024. All rights reserved.