在Kafka中,更新MongoDB中的文档时,“Key”与“Id”不匹配

问题描述 投票:0回答:2

我们正在尝试使用 com.mongodb.kafka.connect.MongoSourceConnector 将所有记录从 MongoDB 获取到 Kafka。用于连接器的设置如下:

{
    "name": "mongo-source",
    "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
        "connection.uri": "mongodb:***:27017/?authSource=admin&replicaSet=myMongoCluster&authMechanism=SCRAM-SHA-256",
        "database": "someDb",
        "collection": "someCollection",
        "output.format.value":"json",
        "output.format.key":"json",
        "key.converter.schemas.enable":"false",
        "value.converter.schemas.enable":"false",
        "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "value.converter":"org.apache.kafka.connect.storage.StringConverter",
        "publish.full.document.only": "true",
        "change.stream.full.document":"updateLookup",
        "copy.existing": "true"
    }
}

当所有文档最初从 MongoDB 上传到 Kafka 时,“key”对应于 Mongo 文档中的“id”:

{"_id": {"_id": {"$oid": "5e54fd0fbb5b5a7d35737232"}, "copyingData": true}}

但是当 MongoDB 中的文档更新时,具有不同“密钥”的更新会进入 Kafka:

{"_id": {"_data": "82627B2EF6000000022B022C0100296E5A1004A47945EC361D42A083988C14D982069C46645F696400645F0FED2B3A35686E505F5ECA0004"}}

因此,消费者无法识别最初上传的文档并对其进行更新。

请帮助我找到 Kafka、Connector 或 MongoDB 端的哪些设置负责此问题,以及如何将 Kafka 中的“Key”更改为与初始上传期间相同的值。

mongodb apache-kafka apache-kafka-connect mongodb-kafka-connector
2个回答
0
投票

我们面临着同样的问题,经过一番搜索后,我们开始使用以下配置。我们定义了 avro 模式来提取输出模式键。密钥是一致生成的,看起来像

结构{fullDocument._id=e2ce4bfe-d03a-4192-830d-895df5a4b095}

这里 "e2ce4bfe-d03a-4192-830d-895df5a4b095" 是文档 ID。

{
  "change.stream.full.document" : "updateLookup",
  "connection.uri" : "<connection_uri>",
  "connector.class" : "com.mongodb.kafka.connect.MongoSourceConnector",
  "collection": "someCollection",
  "copy.existing" : "true",
  "database" : "someDb",
  "key.converter" : "org.apache.kafka.connect.storage.StringConverter",
  "key.converter.schemas.enable" : "false",
  "key.serializer" : "org.apache.kafka.connect.storage.StringConverter",
  "name" : "mongo-source",
  "output.format.key" : "schema",
  "output.json.formatter" : "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson"
  "publish.full.document.only" : "true",
  "output.schema.key": "{\"type\":\"record\",\"name\":\"keySchema\",\"fields\":[{\"name\":\"fullDocument._id\",\"type\":\"string\"}]}",      
  "tasks.max" : "1",
  "value.converter" : "org.apache.kafka.connect.storage.StringConverter",
  "value.converter.schemas.enable" : "false"
}  

0
投票

我建议您附上由

mongodb
开发的 MongoSourceConnector,而不是 Debezium 的。这是因为后者需要比前者更复杂的权限。幸运的是,mongodb 的连接器还支持基于日志的 CDC,即插入/更新/删除查询。

connector.class: com.mongodb.kafka.connect.MongoSourceConnector

就像 Abhra 所说,您可以过滤

fullDocument._id
作为消息的键。

定义

publish.full.document.only
output.schema.key
配置选项如下:

output.format.key: schema
output.json.formatter: com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson
publish.full.document.only: true
output.schema.key: "{\"type\":\"record\",\"name\":\"keySchema\",\"fields\":[{\"name\":\"fullDocument._id\",\"type\":\"string\"}]}"

如果您想要消息的元数据或排除字段,您可以使用

pipeline
,如下所示:

pipeline: "[{\"$match\": {\"operationType\": \"insert\"}}, {\"$project\": {\"operationType\": 0, \"clusterTime\": 0, \"documentKey\": 0}}]"

您只需要做的最后一件事就是使用 SMT 转换 Kafka Message(尤其是

ExtractField
)。

transforms: ExtractField
transforms.ExtractField.type: org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractField.field: fullDocument._id
© www.soinside.com 2019 - 2024. All rights reserved.