我有一个像这样的流程,可以按正文将一些查询发送到下一个流程:
.process(exchange -> {
List<GlossaryDTOResponse[]> glossaries = exchange.getIn().getBody(List.class);
List<String> queries = new ArrayList<>();
queries.add("countries=AF&planttypes=AA&status=DD&language=en-EN&pagesize=1");
queries.add("countries=BF&planttypes=AA&status=DB&language=en-EN&pagesize=1");
List<ActivityIssuesDTO> responses = new ArrayList<>();
// just to send an empity response array to create the final result
Util.setProperty(exchange, "response", responses);
exchange.getIn().setBody(queries);
})
然后我拆分正文以读取每个正文元素:
.split(body())
然后我读取每个元素中的正文并通过 HTTP 查询将其发送到微服务:
.process(exchange -> {
List<GlossaryDTOResponse> glossaryDTOResponses = exchange.getIn().getBody(List.class);
List<ActivityIssuesDTO> responses = exchange.getProperty("response", List.class);
Util.setProperty(exchange, "response", responses);
exchange.getIn().setHeader(Exchange.HTTP_QUERY, glossaryDTOResponses.get(0));
})
.toD(serviceUtils.getActivityEndpoint() + ServiceUtils.ACTIVITIES + Util.BRIDGE_ENDPOINT)
.unmarshal().json(ActivityIssuesDTO.class)
在此过程中,我在 ActivityIssuesDTO 中得到了最终回复,并与正文一起发送。
.process(exchange -> {
ActivityIssuesDTO activityIssuesDTO = exchange.getIn().getBody(ActivityIssuesDTO.class);
List<ActivityIssuesDTO> activityIssuesDTOs = exchange.getProperty("response", List.class);
activityIssuesDTOs.add(activityIssuesDTO);
exchange.getIn().setBody(activityIssuesDTOs);
})
.marshal().json();
问题是我看不到我的回复,也无法整理它。
谢谢你。
我删除了拆分,对于一项(查询)来说没问题,我可以看到我的响应,但对于许多查询我看不到我的响应。
我也用静态数组尝试过,结果是一样的。
据我了解,在每个拆分中,我们还需要一个聚合器来聚合我们的消息以继续该过程,我通过创建一个聚合器解决了这个问题,并将在我的聚合器中创建最终结果,该结果来自我的微服务。
这是最终结果:
.split(body())
.aggregationStrategy(new ActivityAggregator())
.streaming()
.to(direct(GET_QUERY_FINAL))
.end()
这是我的内部路线:
from(direct(GET_QUERY_FINAL))
.log("getting the query and send to activity ms")
.process(exchange -> {
String queries = exchange.getIn().getBody(String.class);
List<ActivityIssuesDTO> response = exchange.getProperty("response", List.class);
Util.setProperty(exchange, "response", response);
exchange.getIn().setHeader(Exchange.HTTP_QUERY, queries);
})
.toD(serviceUtils.getActivityEndpoint() + ServiceUtils.ACTIVITIES + Util.BRIDGE_ENDPOINT)
.unmarshal().json(ActivityIssuesDTO.class);
在我的聚合器中,我只是将最终结果放入我的新数组中。
谢谢。