我正在使用 Kafka 集群,该集群将 MySQL 数据库中包含的数据复制到旧的 Oracle 数据库,以实现我已将源数据库连接到 Debezium Mysql 连接器,将接收器数据库连接到 用于 JDBC 的 Debezium 连接器.
我正在尝试复制数据库的每个表,但在接收器级别遇到问题。
基本上,在源服务器上有一个带有 NOT NULL 约束的表字段,并且在该数据库上插入数据的人通过插入空的“”字符串来解决该要求。
Debezium JDBC Sink 连接器设法正确读取源表的模式,并在 Sink DB 上重新创建它及其所有约束,但不幸的是 Oracle DB 将空字符串解释为 NULL 值,因此 INSERT 被拒绝DB 和 Debezium 都会崩溃(当然)。
这是 Debezium(源)写入 Kafka 集群的平均有效负载,您可以看到第三个字段包含一个空的双引号值:
....
"payload": {
"before": null,
"after": {
"field1": 852,
"field2": 480,
"field3": ""
},
...
是否有一种简单的方法可以在一般级别处理此问题,即拥有适用于可能包含空双引号的每个表的任何字段的解决方案,而无需指定字段名称?
因为不幸的是,用另一个值替换一个字段的值的单一消息转换需要字段的名称,这违背了让 Debezium 处理一切的目的。
这是 Debezium JDBC Sink Connector 的当前配置
{
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:oracle:thin:@ldap://ldpap.com:3060/testdb,cn=OracleContext,dc=domain,dc=priv",
"connection.username": "username",
"connection.password": "password",
"insert.mode": "insert",
"schema.evolution": "basic",
"database.time_zone": "UTC",
"topics.regex": "(DBNAME\.DBNAME\.([^.]+)$)",
"quote.identifiers": "true",
}
连接器正确绑定包含数据的 Kafka 主题,并将 Source DB 的一些表写入 Sink DB,直到到达有问题的表,然后崩溃。设置
"insert.mode":"upsert"
,用 Debezium 处理主键。
恐怕我真的找不到简单的解决方案。
显然,您也许能够创建自己的自定义 SMT,它会拦截数据并用其他一些可接受的字符或字符串替换空字符串 - 但是 a.这超出了我的专业知识,b.我认为这可能风险太大,需要进行广泛的测试。 (而且这些信息是通过人工智能获得的,因此风险可能更大。)
因此,我能想到的唯一相对简单的过程是找到MySQL中会引起此问题的所有表/列,并在传输之前在MySQL中解决它。例如删除非空约束或将空字符串更新为其他内容。
因此找到非空字符串列并测试它是否包含空字符串:
DELIMITER //
CREATE PROCEDURE FindEmptyStringColumns()
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE tableName CHAR(64);
DECLARE columnName CHAR(64);
DECLARE cur CURSOR FOR
SELECT TABLE_NAME, COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND DATA_TYPE IN ('char', 'varchar', 'text')
AND IS_NULLABLE = 'NO';
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
OPEN cur;
read_loop: LOOP
FETCH cur INTO tableName, columnName;
IF done THEN
LEAVE read_loop;
END IF;
SET @s = CONCAT('SELECT "', tableName, '.', columnName,
'" AS table_column, COUNT(*) AS empty_string_count ',
'FROM ', tableName,
' WHERE TRIM(', columnName, ') = ""');
PREPARE stmt FROM @s;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
END LOOP;
CLOSE cur;
END//
DELIMITER ;
然后:
CALL FindEmptyStringColumns();
当然这只是一个建议,由您决定如何处理检测到的任何列。