在NiFi上如何区分来自两个不同Kafka的消息?

问题描述 投票:0回答:2

我有两个不同的应用程序,名为 A1 和 A2。每个应用程序都有自己的 Kafka 服务器。来自这两个 KAFKA 服务器(代理)的消息将发送至 NiFi。

每个 Kafka 都有不同的主题名称,基于此我可以区分来自 Kafka 的消息。但是除了 Kafka 的主题名称之外,NiFi 中还有其他方法来区分来自两个不同 Kafka 的消息吗?是否有任何 NiFi 处理器可以检查主题名称,然后决定下一步要采取的路线?

apache-kafka apache-nifi
2个回答
4
投票

如果您使用NiFi的Kafka处理器(ConsumeKafka/ConsumeKafkaRecord)从Kafka接收消息,它们将以

FlowFiles
的形式输出消息。它们带有一个名为
kafka.topic
的属性,该属性将包含消息来自的主题的名称。

要根据主题名称路由消息,您可以使用

RouteOnAttribute
处理器。举例来说,您有两个主题
topicA
topicB
。然后你必须像这样配置 RouteOnAttribute 处理器:

然后根据您的要求将关系

topic-a
topic-b
连接到单独的流。如果您要添加更多 Kafka 源,您所要做的就是用更多的动态关系更新
RouteOnAttribute
。例如:
topic-c : ${kafka.topic:equals('topicC')}


2
投票

另一个决议。您可以使用

CryptographicHashContent
处理器来实现此目的。您获得内容的唯一哈希码,并基于此您可以确定您的消息。

注意: 由于非标准遗留实现,

HashContent
处理器从 Apache NiFi 1.8.0 起被标记为已弃用,并且可能会在即将发布的版本中删除。

© www.soinside.com 2019 - 2024. All rights reserved.