我有一条在源代码中创建并放入主题的 debezium 消息
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": true,
"field": "col_001"
},
{
"type": "string",
"optional": true,
"field": "col_002"
},
{
"type": "int32",
"optional": true,
"field": "col_003"
},
{
"type": "int32",
"optional": true,
"field": "col_004"
},
{
"type": "int32",
"optional": true,
"field": "col_005"
},
{
"type": "int32",
"optional": true,
"field": "col_006"
},
{
"type": "int32",
"optional": true,
"field": "col_007"
},
{
"type": "string",
"optional": true,
"field": "col_008"
},
{
"type": "string",
"optional": true,
"field": "col_009"
},
{
"type": "string",
"optional": true,
"field": "col_010"
},
{
"type": "string",
"optional": true,
"field": "col_011"
},
{
"type": "int32",
"optional": true,
"field": "col_012"
},
{
"type": "int32",
"optional": true,
"field": "col_013"
},
{
"type": "int32",
"optional": true,
"field": "col_014"
},
{
"type": "string",
"optional": true,
"field": "col_015"
},
{
"type": "string",
"optional": true,
"field": "col_016"
},
{
"type": "string",
"optional": true,
"field": "col_017"
},
{
"type": "int32",
"optional": true,
"field": "col_018"
},
{
"type": "int64",
"optional": true,
"field": "_sling_loaded_at"
}
],
"optional": true,
"name": "tpcds.public.customer.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": true,
"field": "col_001"
},
{
"type": "string",
"optional": true,
"field": "col_002"
},
{
"type": "int32",
"optional": true,
"field": "col_003"
},
{
"type": "int32",
"optional": true,
"field": "col_004"
},
{
"type": "int32",
"optional": true,
"field": "col_005"
},
{
"type": "int32",
"optional": true,
"field": "col_006"
},
{
"type": "int32",
"optional": true,
"field": "col_007"
},
{
"type": "string",
"optional": true,
"field": "col_008"
},
{
"type": "string",
"optional": true,
"field": "col_009"
},
{
"type": "string",
"optional": true,
"field": "col_010"
},
{
"type": "string",
"optional": true,
"field": "col_011"
},
{
"type": "int32",
"optional": true,
"field": "col_012"
},
{
"type": "int32",
"optional": true,
"field": "col_013"
},
{
"type": "int32",
"optional": true,
"field": "col_014"
},
{
"type": "string",
"optional": true,
"field": "col_015"
},
{
"type": "string",
"optional": true,
"field": "col_016"
},
{
"type": "string",
"optional": true,
"field": "col_017"
},
{
"type": "int32",
"optional": true,
"field": "col_018"
},
{
"type": "int64",
"optional": true,
"field": "_sling_loaded_at"
}
],
"optional": true,
"name": "tpcds.public.customer.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "tpcds.public.customer.Envelope",
"version": 1
},
"payload": {
"before": {
"col_001": 6,
"col_002": "AAAAAAAAGAAAAAAA",
"col_003": 213219,
"col_004": 6374,
"col_005": 27082,
"col_006": 2451883,
"col_007": 2451853,
"col_008": "Ms.",
"col_009": "Brunilda aaa bbb ccc ddd",
"col_010": "Sharp",
"col_011": "Y",
"col_012": 4,
"col_013": 12,
"col_014": 1925,
"col_015": "SURINAME",
"col_016": null,
"col_017": "[email protected]",
"col_018": 2452430,
"_sling_loaded_at": 1713464143
},
"after": {
"col_001": 6,
"col_002": "AAAAAAAAGAAAAAAA",
"col_003": 213219,
"col_004": 6374,
"col_005": 27082,
"col_006": 2451883,
"col_007": 2451853,
"col_008": "Ms.",
"col_009": "Brunilda aaa bbb ccc ddd eee",
"col_010": "Sharp",
"col_011": "Y",
"col_012": 4,
"col_013": 12,
"col_014": 1925,
"col_015": "SURINAME",
"col_016": null,
"col_017": "[email protected]",
"col_018": 2452430,
"_sling_loaded_at": 1713464143
},
"source": {
"version": "2.5.4.Final",
"connector": "postgresql",
"name": "tpcds",
"ts_ms": 1713471958528,
"snapshot": "false",
"db": "tpcds",
"sequence": "[\"108523976\",\"108524328\"]",
"schema": "public",
"table": "customer",
"txId": 777,
"lsn": 108524328,
"xmin": null
},
"op": "u",
"ts_ms": 1713471958951,
"transaction": null
}
}
我有接收器连接器,它需要这种格式
{
"col_001": 1,
"col_002": "AAAAAAAAGAAAAAAA",
"col_003": 213219,
"col_004": 6374,
"col_005": 27082,
"col_006": 2451883,
"col_007": 2451853,
"col_008": "Ms.",
"col_009": "Brunilda aaa",
"col_010": "Sharp",
"col_011": "Y",
"col_012": 4,
"col_013": 12,
"col_014": 1925,
"col_015": "SURINAME",
"col_016": null,
"col_017": "[email protected]",
"col_018": 2452430,
"_sling_loaded_at": 1713464143,
"__op": 1
}
我想我需要使用卡夫卡接收器转换,但我不知道该怎么做。有什么想法吗?
您尚未提供 Kafka Connect 配置,因此我无法直接说明如何使用它,但 Debezium 文档对此非常好且清晰。您基本上为其添加了一个
transforms
条目。