debezium sqlserver 连接器输出数字/小数字段的编码值

问题描述 投票:0回答:2

我在 SQL Server 中有这个表:

CREATE TABLE [dbo].[blast_info](
    [blast_id] [int] NOT NULL,
    [tnt_amount_kg] [decimal](18, 2) NOT NULL,
    [time_blasted] [datetime] NOT NULL,
    [hole_deep_ft] [numeric](9, 2) NULL,
    [hole_coord_n] [numeric](18, 6) NOT NULL,
    [hole_coord_e] [numeric](18, 6) NULL
) ON [PRIMARY]

debezium 配置为从 confluence 作为插件运行。数据被发布到 Kafka 中,但是当我通过 python 或控制台消费者读取它时,我看到数字和十进制类型的编码值:

{"blast_id":17,"tnt_amount_kg":"eA==","time_blasted":1585803600000,"hole_deep_ft":"AOY=","hole_coord_n":"A/OOVvYA","hole_coord_e":"AKSQkBwA","__ts_ms":1586140437125}
{"blast_id":16,"tnt_amount_kg":"ANw=","time_blasted":1583125200000,"hole_deep_ft":"Aa4=","hole_coord_n":"A/OOVvYA","hole_coord_e":"AKSQkBwA","__ts_ms":1586140437125}
{"blast_id":17,"tnt_amount_kg":"eA==","time_blasted":1585803600000,"hole_deep_ft":"AOY=","hole_coord_n":"A/OOVvYA","hole_coord_e":"AKSQkBwA","__ts_ms":1586140437126}
Processed a total of 38 messages

这是为什么?解决办法是什么?谢谢。

sql-server apache-kafka confluent-platform connector debezium
2个回答
1
投票

可能的快速修复(完全不推荐):

只需在 SQL Server 中使用

real
数据类型,而不是
numeric
decimal
,因为 Debezium 会将
real
存储为
float

长期修复:

Debezium SQL 服务器连接器文档中所述,它将

decimal
numeric
值存储为
binary
,由类
org.apache.kafka.connect.data.Decimal
表示。

您可以从消息本身检索此信息,但为此您需要在消息中启用架构。您可以通过设置

key.converter.schemas.enable=true
(对于消息键)和
value.converter.schemas.enable=true
(对于消息值)来完成此操作。

更改上述属性后,您的消息将包含架构信息。 参考这个例子:

表架构:

CREATE TABLE [dbo].[kafka_datatype](
    [id] [int] IDENTITY(1,1)  PRIMARY KEY,
    [col_value] [varchar](10) NULL,
    [create_date] [datetime] NULL,
    [col_decimal] [decimal](33, 18) NULL,
    [col_double] [real] NULL,
    [comments] [varchar](5000) NULL
) 

卡夫卡消息:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "col_value"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "create_date"
      },
      {
        "type": "bytes",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Decimal",
        "version": 1,
        "parameters": {
          "scale": "18",
          "connect.decimal.precision": "33"
        },
        "field": "col_decimal"
      },
      {
        "type": "float",
        "optional": true,
        "field": "col_double"
      },
      {
        "type": "string",
        "optional": true,
        "field": "comments"
      }
    ],
    "optional": true,
    "name": "TEST.dbo.kafka_datatype.Value"
  },
  "payload": {
    "id": 15,
    "col_value": "test",
    "create_date": 1586335960297,
    "col_decimal": "AKg/JYrONaAA",
    "col_double": 12.12345,
    "comments": null
  }
}

请阅读Debezium SQL 服务器连接器文档 以了解 Debezium 如何处理数据类型。

现在来到消费者部分,根据您的需要使用接收器连接器(例如JDBC Sink Connector)。如果您想使用 python 或控制台消费者,您需要编写自己的反序列化器。

附注:

随着时间的推移可能出现的一个问题是,随着模式的存储,每条消息的主题大小都会增加。为了避免将模式持久化到消息中,您可以使用 Avro Converter 以及 Confluence 提供的Schema Registry

希望这有帮助!


1
投票

您可以查看此连接器属性

decimal.handling.mode
。我使用
decimal.handling.mode: double
来获取小数值。我还附上了 doc 链接

© www.soinside.com 2019 - 2024. All rights reserved.