在debezium sql服务器源连接器中是否有将键字段值转换为小写的方法?

问题描述 投票:0回答:1

我想将SQL-Server列名转换为小写,同时将其存储在Kafka主题中。我正在使用debezium作为我的源连接器

apache-kafka apache-kafka-connect confluent debezium
1个回答
0
投票

可以使用Jeremy Custenborder的Kafka Connect Common Transformations完成

SQL Server表:

Id  Name Description  Weight Pro_Id
101 aaa  Sample_Test  3.14   2020-02-21 13:32:06.5900000
102 eee  testdata1    3.14   2020-02-21 13:32:06.5900000

[第1步:从此link下载融合中心中的Jeremy Custenborder的kafka connect通用转换jar文件>>

步骤2:根据您的kafka环境将jar文件放在/ usr / share / java或/ kafka / libs中

步骤3:创建debezium SQL-Server源连接器

{
  "name": "sqlserver_src_connector",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "database.server.name": "sqlserver",
    "database.hostname": "*.*.*.*",
    "database.port": "1433",
    "database.user": "username",
    "database.password": "password",
    "database.dbname": "db_name",
    "table.whitelist": "dbo.tablename",
    "transforms": "unwrap,changeCase",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.changeCase.type" : "com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Value",
    "transforms.changeCase.from" : "UPPER_UNDERSCORE",
    "transforms.changeCase.to" : "LOWER_UNDERSCORE",
    "database.history.kafka.bootstrap.servers": "*.*.*.*",
    "database.history.kafka.topic": "schema-changes-tablename"
  }
}

步骤4:kafka主题数据

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": false,
        "field": "name"
      },
      {
        "type": "string",
        "optional": true,
        "field": "description"
      },
      {
        "type": "double",
        "optional": true,
        "field": "weight"
      },
      {
        "type": "int64",
        "optional": false,
        "name": "io.debezium.time.NanoTimestamp",
        "version": 1,
        "field": "pro_id"
      }
    ],
    "optional": true,
    "name": "sqlserver.dbo.tablename"
  },
  "payload": {
    "id": 101,
    "name": "aaa",
    "description": "Sample_Test",
    "weight": 3.14,
    "pro_id": 1582291926590000000
  }
}
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": false,
        "field": "name"
      },
      {
        "type": "string",
        "optional": true,
        "field": "description"
      },
      {
        "type": "double",
        "optional": true,
        "field": "weight"
      },
      {
        "type": "int64",
        "optional": false,
        "name": "io.debezium.time.NanoTimestamp",
        "version": 1,
        "field": "pro_id"
      }
    ],
    "optional": true,
    "name": "sqlserver.dbo.tablename"
  },
  "payload": {
    "id": 102,
    "name": "eee",
    "description": "testdata1",
    "weight": 3.14,
    "pro_id": 1582291926590000000
  }
}

感谢您的帮助Jiri Pechanec和Chris Cranford @Naros来自debezium community

© www.soinside.com 2019 - 2024. All rights reserved.