Apache Camel 抛出“由于不允许同一端点有多个消费者而无法启动路由”

问题描述 投票:0回答:0

我是 Apache Camel 的新手,并在我的 Spring Boot 应用程序中创建以下方法:

from("timer:myTimer?period=60000")
    .setBody(simple((currentTimeMillis()).toString(), Long::class.java))
    .routeId("getCurrentTime")
    .multicast().parallelProcessing()
    .to("direct:GetCreated", "direct:GetDeleted")
from("direct:GetCreated")
.routeId("my-route-01") // I am not sure if I should set route id
.process(getCreatedProcessor)
.process { counters.getValue("CreatedCounter").increment() }
.to("direct:getCreatedRecords")


from("direct:GetDeleted")
.routeId("my-route-02")
.process(getDeletedProcessor)
.process { counters.getValue("DeletedCounter").increment() }
.to("direct:getDeletedRecords")

它可以正常工作,没有任何问题。但是,当我从这些方法合并数据(创建和删除)时,如果我使用“GetCreated”和“GetUpdated”等相同的 URL,则会收到“由于不允许同一端点的多个使用者而无法启动路由”。

我还尝试将

.to("direct:getCreatedRecords")
.to("direct:getDeletedRecords")
添加到这些方法中,并尝试使用它,如下所示。但在这种情况下,我无法获取数据(我看不到通过
.to("stream:out")
)。那么,这些方法的路由定义有什么问题呢?

from("direct:GetCreated")
// from("direct:getCreatedRecords) // ???
    .enrich("direct:GetDeleted") { oldExchange: Exchange, newExchange: Exchange ->
    // .enrich("direct:getDeletedRecords") { oldExchange: Exchange, newExchange: Exchange -> // ???
        if (newExchange == null) {
            oldExchange
        }
        val oldBody = oldExchange.getIn().getBody(List::class.java as Class<List<CreatedDataDto>>)
        val newBody = newExchange.getIn().getBody(List::class.java as Class<List<DeletedDataDto>>)
        oldExchange.getIn().body = MergedDataDto(oldBody, newBody)
        oldExchange
    }
    .to("stream:out") 
    .to("my-kafka-url")
java spring-boot apache kotlin apache-camel
© www.soinside.com 2019 - 2024. All rights reserved.