Apache Camel-Kafka组件-单一生产者多个消费者

问题描述 投票:0回答:1

[我正在创建两个Apache骆驼(蓝图XML)kafka项目,一个是kafka-producer,它接受请求并将其存储在kafka服务器中,另一个是kafka-consumer,它从kafka服务器中提取消息并进行处理。

此设置适用于单个主题和单个使用者。但是,如何在同一kafka主题中创建单独的消费者组?如何在不同的消费者组中的同一主题内路由多个消费者特定的消息?任何帮助表示赞赏。谢谢。

java apache-kafka apache-camel kafka-consumer-api kafka-producer-api
1个回答
0
投票

您的问题非常笼统,因为您不清楚要解决的问题是什么,因此很难理解是否有更好的方法来实现该解决方案。

无论如何,据我所知,您正在寻找的是选择性消费者(EIP),Kafka和消费者API不支持即用型。选择性使用者可以根据生产者预先提供的特定选择器的值,从队列或主题中选择要选择的消息。该功能也必须在消息代理中实现,但是kafka没有这种功能。

Kafka确实实现了纯发布/订阅与队列之间的混合解决方案。就是说,您可以做的是与一个或多个消费者组(稍后再讨论)订阅该主题,并通过检查消息本身来过滤出您不感兴趣的所有消息。在消息传递和EIP世界中,此模式称为“筛选器数组”。可以想象,这种情况是在将消息广播给所有订户之后发生的;因此,如果该解决方案不符合您的要求或上下文,那么您可以考虑实现一个基于内容的路由器,该路由器旨在仅在集中控制下将消息分发给一部分消费者(这意味着特定于消费者的中间渠道)。当然)。

移至第二个问题,这是Kafka Component官方网站:https://camel.apache.org/components/latest/kafka-component.html。为了创建不同的消费者组,您只需定义多个路由,每个路由都有一个专用的groupId。通过添加groupdId属性,您将通知消费者组协调员(位于Kafka经纪人中)存在多个分离的消费者组,并且经纪人将使用它们来区分和分开对待(通过向他们发送每个的副本)日志消息存储在主题中)...

这里是一个例子:

public void configure() throws Exception {
    from("kafka:myTopic?brokers={{kafkaBootstrapServers}}" +
                 "&groupId=myFirstConsumerGroup"
            .log("Message received by myFirstConsumerGroup : ${body}");

    from("kafka:myTopic?brokers={{kafkaBootstrapServers}}" +
                 "&groupId=mySecondConsumerGroup"
            .log("Message received by mySecondConsumerGroup : ${body}");

}

如您所见,我在同一RouteBuilder中创建了两条路由,更不用说在同一Java进程中了。在我能想到的大多数用例中,这是一个非常糟糕的设计决策,因为没有单一的责任,分离的关注并且它们不会扩展。但是同样,这取决于您的要求/上下文。我试图保持高水平,以便进行讨论。如果您有新的更新,我将编辑我的答案。希望我有所帮助!

© www.soinside.com 2019 - 2024. All rights reserved.