如何用c#反序列化kafka消息

问题描述 投票:0回答:1

我已经在 Kafka 中成功实现了 debezium postgres 连接器,它拦截了我的 posgres 表中所做的更改。

我还成功实现了一个消费者,那就是订阅我的主题。

但是我想反序列化我的kafka消息:

我的消息如下所示:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "member_id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "name": "io.debezium.time.MicroTimestamp",
                        "version": 1,
                        "field": "created_date"
                    }
                ],
                "optional": true,
                "name": "dbserver01.membership.member.Value",
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "member_id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "name": "io.debezium.time.MicroTimestamp",
                        "version": 1,
                        "field": "created_date"
                    }
                ],
                "optional": true,
                "name": "dbserver01.membership.member.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "name": "io.debezium.data.Enum",
                        "version": 1,
                        "parameters": {
                            "allowed": "true,last,false,incremental"
                        },
                        "default": "false",
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "sequence"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "schema"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "table"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "txId"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "lsn"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "xmin"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.postgresql.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "total_order"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "data_collection_order"
                    }
                ],
                "optional": true,
                "field": "transaction"
            }
        ],
        "optional": false,
        "name": "dbserver01.membership.member.Envelope"
    },
    "payload": {
        "before": null,
        "after": {
            "member_id": 1004,
            "first_name": "Paula",
            "last_name": "Abdul",
            "created_date": 1705881600000000
        },
        "source": {
            "version": "1.9.7.Final",
            "connector": "postgresql",
            "name": "dbserver01",
            "ts_ms": 1705952843423,
            "snapshot": "false",
            "db": "postgres",
            "sequence": "[\"37097688\",\"37098080\"]",
            "schema": "membership",
            "table": "member",
            "txId": 771,
            "lsn": 37098080,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1705952843807,
        "transaction": null
    }
}

现在本质上我想反序列化我的有效负载,包括“op”:“c”,它告诉我是否创建、更新或删除我的数据(在本例中为创建)。

我确信有一种处理我的消息的标准方法,但我找不到任何如何处理我的消息数据的示例。

我已经在我的消费者控制台应用程序中创建了一个成员类,但我不确定如何正确反序列化我的kafka消息。

我已阅读使用架构注册表,但我不确定如何继续使用该选项。

我尝试过看这个例子: https://github.com/confluenceinc/confluence-kafka-dotnet/blob/master/examples/JsonSerialization/Program.cs

但是我的控制台应用程序在运行时因错误而崩溃。 程序 [27728] 已退出,代码为 0 (0x0)。

c# apache-kafka kafka-consumer-api debezium
1个回答
-1
投票

那是json数据。在这里阅读相关内容https://learn.microsoft.com/en-us/dotnet/standard/serialization/system-text-json/how-to

基本上,您可以在 C# 中创建等效的结构,然后可以将其直接转换为它。

© www.soinside.com 2019 - 2024. All rights reserved.