我可以根据内容使用 Mongo Sink Connector 忽略整个消息吗?

问题描述 投票:0回答:1

我对 Kafka 和 MongoDB 都很陌生,但我有一个现有的 Kafka 主题,我想将其消费到 Mongo 数据库中。我试图避免构建自己的消费者或连接器,而是仅使用Sink Connector。这样,我可以避免使用单独的连接器/消费者组件所带来的额外维护、成本和复杂性,也可以避免首先构建它的工作。复杂的部分是我想在进行任何插入之前以两种方式处理消息:

  1. 忽略其值不满足条件的消息(例如,颜色 = 绿色)

  2. 对于满足此条件的人,仅将消息中的某些字段插入到集合中

我认为我可以使用后处理器来完成转换部分。但是,我还没有看到有条件地完全忽略某些消息的示例。我本以为与转换数据相比,这对于接收器连接器来说是相当微不足道的事情。我是不是错过了什么?

mongodb apache-kafka apache-kafka-connect mongodb-kafka-connector
1个回答
0
投票

您可以执行以下操作:

  1. 使用 redhat debezium filter smt 仅接受满足条件的消息。
    transforms=filter
    transforms.filter.type=io.debezium.transforms.Filter
    transforms.filter.language=jsr223.groovy
    transforms.filter.condition=value.colour == 'green'
    
  2. 配置sink-connector后处理器以便仅加载到集合中必需的字段。
    post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
    value.projection.type=AllowList
    value.projection.list=colour,size,etc.
    

这是一个如何利用第三方 Debezium 过滤器 smt 来过滤消息以及如何利用本机 MongoDB Kafka 接收器连接器后处理器来转换输出文档的示例。

© www.soinside.com 2019 - 2024. All rights reserved.