我正在尝试为 ServiceNow 设置 Apache Kafka 源和接收器连接器。接收器连接器工作完美,我能够使用以下转换将嵌套的 JSON 负载展平为 ServiceNow 列名称。 参考
"transforms": "FlattenSchema,RenameFields"
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "."
基本上转换会在 JSON 以下变平
"payload": {
"prjtId": "123"
}
致
payload.prjtId
现在我正在尝试源连接器,即让 Apache Kafka 轮询 ServiceNow 表中的新记录并将其作为 Kafka 消息进行流式传输。流消息工作得很好,但我在将表记录转换为预期的嵌套 JSON 结构时遇到了麻烦。
我的 ServiceNow 表包含以下各列:
messageType
messageName
version
payload.projectId
payload.responseCode
当我使用 ServiceNow 连接器检索此记录作为 Kafka 消息时,我希望它的结构低于预期,
预期结果:
{
"messageType": "InventoryRecordUpdate",
"messageName": "Validated",
"version": "v1",
"payload": {
"projectId": "123456",
"responseCode" : "XXX"
}
}
但是当我尝试使用以下配置时,我得到如下所示的 result1 或 result2:
与:
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
Result1:它基本上在消息中抛出模式并将表列封装在“有效负载”下:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "u_messagetype"
},
{
"type": "string",
"optional": true,
"field": "version"
},
{
"type": "string",
"optional": true,
"field": "u_messagename"
},
{
"type": "string",
"optional": true,
"field": "u_payload_projectid"
},
{
"type": "string",
"optional": true,
"field": "u_payload_responsecode"
}
],
"optional": false
},
"payload": {
"u_messagetype": "InventoryRecordUpdate",
"u_version": "v2",
"u_messagename": "Validated",
"u_payload_projectid": "123456",
"u_payload_responsecode": "200"
}
}
并使用以下配置,
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
结果2:排除了kafka消息正文中的发送模式,还删除了名为“payload”的字段,并按原样发送该字段而不嵌套,如下所示:
{
"u_messagetype": "InventoryRecordUpdate",
"u_version": "v2",
"u_messagename": "Validated",
"u_payload_projectid": "123456",
"u_payload_responsecode": "200"
}
我没有找到有关将键“u_payload_projectid”去扁平化为嵌套结构的文档。如果有人能指出我正确的方向,我将不胜感激。
不存在“去扁平化”的变换。您获得的数据很大程度上取决于连接器本身。由于模式返回两个顶级字符串字段,而不是记录/结构,因此在不修改连接器源代码的情况下,从连接端无法真正完成任何操作
因此,您需要编写一些流处理器来检测具有共享前缀的字段并自行重新映射数据