我已经在 Kafka 中成功实现了 debezium postgres 连接器,它拦截了我的 posgres 表中所做的更改。
我还成功实现了一个消费者,那就是订阅我的主题。
但是我想反序列化我的kafka消息:
我的消息如下所示:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "member_id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "int64",
"optional": false,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "created_date"
}
],
"optional": true,
"name": "dbserver01.membership.member.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "member_id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "int64",
"optional": false,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "created_date"
}
],
"optional": true,
"name": "dbserver01.membership.member.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"field": "transaction"
}
],
"optional": false,
"name": "dbserver01.membership.member.Envelope"
},
"payload": {
"before": null,
"after": {
"member_id": 1004,
"first_name": "Paula",
"last_name": "Abdul",
"created_date": 1705881600000000
},
"source": {
"version": "1.9.7.Final",
"connector": "postgresql",
"name": "dbserver01",
"ts_ms": 1705952843423,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"37097688\",\"37098080\"]",
"schema": "membership",
"table": "member",
"txId": 771,
"lsn": 37098080,
"xmin": null
},
"op": "c",
"ts_ms": 1705952843807,
"transaction": null
}
}
现在本质上我想反序列化我的有效负载,包括“op”:“c”,它告诉我是否创建、更新或删除我的数据(在本例中为创建)。
我确信有一种处理我的消息的标准方法,但我找不到任何如何处理我的消息数据的示例。
我已经在我的消费者控制台应用程序中创建了一个成员类,但我不确定如何正确反序列化我的kafka消息。
我已阅读使用架构注册表,但我不确定如何继续使用该选项。
但是我的控制台应用程序在运行时因错误而崩溃。 程序 [27728] 已退出,代码为 0 (0x0)。
那是json数据。在这里阅读相关内容https://learn.microsoft.com/en-us/dotnet/standard/serialization/system-text-json/how-to
基本上,您可以在 C# 中创建等效的结构,然后可以将其直接转换为它。