当前,我有一个接收器连接器,该接收器从主题A获取数据并将其发送到外部服务。
现在我有一个用例,当我基于某种逻辑将其发送给主题B而不是服务时。并且此逻辑基于目标服务的响应,它将基于数据返回响应。因此,因为每次我无法使用流api时都应将数据发送到目标系统。
以某种方式可行吗?
还是我应该手动将kafka生产者添加到我的接收器?如果有,那么有什么缺点吗?
第一个选择是创建一个custom Kafka Connect Single Message Transform,它将实现所需的逻辑并可能也使用ExtractTopic
(取决于您的自定义smt的外观)。
第二个选择是建立自己的消费者。例如:
第1步:在主题A之上再创建一个主题
再创建一个话题,说ExtractTopic
第2步:实施您的自定义使用者
代替您的Kafka Connect Sink连接器,实现实现所需的Kafka Consumer。
[此时,您需要实例化一个Kafka Producer,并根据逻辑来决定是否需要将该主题转发到topic_a_to_target_system
或目标系统(topic_B
)。
第3步:启动topic_a_to_target_system
上的接收器连接器
最后启动您的接收器连接器,以便将数据从主题topic_a_to_target_system
接收到目标系统。