如何将kafka消息转换为不同的json格式

问题描述 投票:0回答:1

我有一条在源代码中创建并放入主题的 debezium 消息

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_001"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_002"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_003"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_004"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_005"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_006"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_007"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_008"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_009"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_010"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_011"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_012"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_013"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_014"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_015"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_016"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_017"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_018"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "_sling_loaded_at"
                    }
                ],
                "optional": true,
                "name": "tpcds.public.customer.Value",
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_001"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_002"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_003"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_004"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_005"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_006"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_007"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_008"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_009"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_010"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_011"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_012"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_013"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_014"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_015"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_016"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_017"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_018"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "_sling_loaded_at"
                    }
                ],
                "optional": true,
                "name": "tpcds.public.customer.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "name": "io.debezium.data.Enum",
                        "version": 1,
                        "parameters": {
                            "allowed": "true,last,false,incremental"
                        },
                        "default": "false",
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "sequence"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "schema"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "table"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "txId"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "lsn"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "xmin"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.postgresql.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "total_order"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "data_collection_order"
                    }
                ],
                "optional": true,
                "name": "event.block",
                "version": 1,
                "field": "transaction"
            }
        ],
        "optional": false,
        "name": "tpcds.public.customer.Envelope",
        "version": 1
    },
    "payload": {
        "before": {
            "col_001": 6,
            "col_002": "AAAAAAAAGAAAAAAA",
            "col_003": 213219,
            "col_004": 6374,
            "col_005": 27082,
            "col_006": 2451883,
            "col_007": 2451853,
            "col_008": "Ms.",
            "col_009": "Brunilda aaa bbb ccc ddd",
            "col_010": "Sharp",
            "col_011": "Y",
            "col_012": 4,
            "col_013": 12,
            "col_014": 1925,
            "col_015": "SURINAME",
            "col_016": null,
            "col_017": "[email protected]",
            "col_018": 2452430,
            "_sling_loaded_at": 1713464143
        },
        "after": {
            "col_001": 6,
            "col_002": "AAAAAAAAGAAAAAAA",
            "col_003": 213219,
            "col_004": 6374,
            "col_005": 27082,
            "col_006": 2451883,
            "col_007": 2451853,
            "col_008": "Ms.",
            "col_009": "Brunilda aaa bbb ccc ddd eee",
            "col_010": "Sharp",
            "col_011": "Y",
            "col_012": 4,
            "col_013": 12,
            "col_014": 1925,
            "col_015": "SURINAME",
            "col_016": null,
            "col_017": "[email protected]",
            "col_018": 2452430,
            "_sling_loaded_at": 1713464143
        },
        "source": {
            "version": "2.5.4.Final",
            "connector": "postgresql",
            "name": "tpcds",
            "ts_ms": 1713471958528,
            "snapshot": "false",
            "db": "tpcds",
            "sequence": "[\"108523976\",\"108524328\"]",
            "schema": "public",
            "table": "customer",
            "txId": 777,
            "lsn": 108524328,
            "xmin": null
        },
        "op": "u",
        "ts_ms": 1713471958951,
        "transaction": null
    }
}

我有接收器连接器,它需要这种格式

{
            "col_001": 1,
            "col_002": "AAAAAAAAGAAAAAAA",
            "col_003": 213219,
            "col_004": 6374,
            "col_005": 27082,
            "col_006": 2451883,
            "col_007": 2451853,
            "col_008": "Ms.",
            "col_009": "Brunilda aaa",
            "col_010": "Sharp",
            "col_011": "Y",
            "col_012": 4,
            "col_013": 12,
            "col_014": 1925,
            "col_015": "SURINAME",
            "col_016": null,
            "col_017": "[email protected]",
            "col_018": 2452430,
            "_sling_loaded_at": 1713464143,
            "__op": 1
}

我想我需要使用卡夫卡接收器转换,但我不知道该怎么做。有什么想法吗?

apache-kafka-connect debezium
1个回答
0
投票

使用事件扁平化/新记录状态提取

您尚未提供 Kafka Connect 配置,因此我无法直接说明如何使用它,但 Debezium 文档对此非常好且清晰。您基本上为其添加了一个

transforms
条目。

© www.soinside.com 2019 - 2024. All rights reserved.