我想将SQL-Server列名转换为小写,同时将其存储在Kafka主题中。我正在使用debezium作为我的源连接器
可以使用Jeremy Custenborder的Kafka Connect Common Transformations完成
SQL Server表:
Id Name Description Weight Pro_Id
101 aaa Sample_Test 3.14 2020-02-21 13:32:06.5900000
102 eee testdata1 3.14 2020-02-21 13:32:06.5900000
[第1步:从此link下载融合中心中的Jeremy Custenborder的kafka connect通用转换jar文件>>
步骤2:根据您的kafka环境将jar文件放在/ usr / share / java或/ kafka / libs中
步骤3:创建debezium SQL-Server源连接器
{ "name": "sqlserver_src_connector", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.server.name": "sqlserver", "database.hostname": "*.*.*.*", "database.port": "1433", "database.user": "username", "database.password": "password", "database.dbname": "db_name", "table.whitelist": "dbo.tablename", "transforms": "unwrap,changeCase", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.changeCase.type" : "com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Value", "transforms.changeCase.from" : "UPPER_UNDERSCORE", "transforms.changeCase.to" : "LOWER_UNDERSCORE", "database.history.kafka.bootstrap.servers": "*.*.*.*", "database.history.kafka.topic": "schema-changes-tablename" } }
步骤4:kafka主题数据
{ "schema": { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "name" }, { "type": "string", "optional": true, "field": "description" }, { "type": "double", "optional": true, "field": "weight" }, { "type": "int64", "optional": false, "name": "io.debezium.time.NanoTimestamp", "version": 1, "field": "pro_id" } ], "optional": true, "name": "sqlserver.dbo.tablename" }, "payload": { "id": 101, "name": "aaa", "description": "Sample_Test", "weight": 3.14, "pro_id": 1582291926590000000 } } { "schema": { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "name" }, { "type": "string", "optional": true, "field": "description" }, { "type": "double", "optional": true, "field": "weight" }, { "type": "int64", "optional": false, "name": "io.debezium.time.NanoTimestamp", "version": 1, "field": "pro_id" } ], "optional": true, "name": "sqlserver.dbo.tablename" }, "payload": { "id": 102, "name": "eee", "description": "testdata1", "weight": 3.14, "pro_id": 1582291926590000000 } }
感谢您的帮助Jiri Pechanec和Chris Cranford @Naros来自debezium community