我是 Apache Camel 的新手,并在我的 Spring Boot 应用程序中创建以下方法:
from("timer:myTimer?period=60000")
.setBody(simple((currentTimeMillis()).toString(), Long::class.java))
.routeId("getCurrentTime")
.multicast().parallelProcessing()
.to("direct:GetCreated", "direct:GetDeleted")
from("direct:GetCreated")
.routeId("my-route-01") // I am not sure if I should set route id
.process(getCreatedProcessor)
.process { counters.getValue("CreatedCounter").increment() }
.to("direct:getCreatedRecords")
from("direct:GetDeleted")
.routeId("my-route-02")
.process(getDeletedProcessor)
.process { counters.getValue("DeletedCounter").increment() }
.to("direct:getDeletedRecords")
它可以正常工作,没有任何问题。但是,当我从这些方法合并数据(创建和删除)时,如果我使用“GetCreated”和“GetUpdated”等相同的 URL,则会收到“由于不允许同一端点的多个使用者而无法启动路由”。
我还尝试将
.to("direct:getCreatedRecords")
和 .to("direct:getDeletedRecords")
添加到这些方法中,并尝试使用它,如下所示。但在这种情况下,我无法获取数据(我看不到通过.to("stream:out")
)。那么,这些方法的路由定义有什么问题呢?
from("direct:GetCreated")
// from("direct:getCreatedRecords) // ???
.enrich("direct:GetDeleted") { oldExchange: Exchange, newExchange: Exchange ->
// .enrich("direct:getDeletedRecords") { oldExchange: Exchange, newExchange: Exchange -> // ???
if (newExchange == null) {
oldExchange
}
val oldBody = oldExchange.getIn().getBody(List::class.java as Class<List<CreatedDataDto>>)
val newBody = newExchange.getIn().getBody(List::class.java as Class<List<DeletedDataDto>>)
oldExchange.getIn().body = MergedDataDto(oldBody, newBody)
oldExchange
}
.to("stream:out")
.to("my-kafka-url")