如何使用 Debezium 处理 OracleDB 中经典的空字符串转换为 NULL

问题描述 投票:0回答:1

我正在使用 Kafka 集群,该集群将 MySQL 数据库中包含的数据复制到旧的 Oracle 数据库,以实现我已将源数据库连接到 Debezium Mysql 连接器,将接收器数据库连接到 用于 JDBC 的 Debezium 连接器.

我正在尝试复制数据库的每个表,但在接收器级别遇到问题。
基本上,在源服务器上有一个带有 NOT NULL 约束的表字段,并且在该数据库上插入数据的人通过插入空的“”字符串来解决该要求。

Debezium JDBC Sink 连接器设法正确读取源表的模式,并在 Sink DB 上重新创建它及其所有约束,但不幸的是 Oracle DB 将空字符串解释为 NULL 值,因此 INSERT 被拒绝DB 和 Debezium 都会崩溃(当然)。

这是 Debezium(源)写入 Kafka 集群的平均有效负载,您可以看到第三个字段包含一个空的双引号值:

....

"payload": {
        "before": null,
        "after": {
            "field1": 852,
            "field2": 480,
            "field3": ""
        },

...

是否有一种简单的方法可以在一般级别处理此问题,即拥有适用于可能包含空双引号的每个表的任何字段的解决方案,而无需指定字段名称?

因为不幸的是,用另一个值替换一个字段的值的单一消息转换需要字段的名称,这违背了让 Debezium 处理一切的目的。

这是 Debezium JDBC Sink Connector 的当前配置

{
    "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:oracle:thin:@ldap://ldpap.com:3060/testdb,cn=OracleContext,dc=domain,dc=priv",
    "connection.username": "username",
    "connection.password": "password",
    "insert.mode": "insert",
    "schema.evolution": "basic",
    "database.time_zone": "UTC",
    "topics.regex": "(DBNAME\.DBNAME\.([^.]+)$)",
    "quote.identifiers": "true",
}

连接器正确绑定包含数据的 Kafka 主题,并将 Source DB 的一些表写入 Sink DB,直到到达有问题的表,然后崩溃。设置

"insert.mode":"upsert"
,用 Debezium 处理主键。

oracle apache-kafka apache-kafka-connect debezium
1个回答
0
投票

恐怕我真的找不到简单的解决方案。

显然,您也许能够创建自己的自定义 SMT,它会拦截数据并用其他一些可接受的字符或字符串替换空字符串 - 但是 a.这超出了我的专业知识,b.我认为这可能风险太大,需要进行广泛的测试。 (而且这些信息是通过人工智能获得的,因此风险可能更大。)

因此,我能想到的唯一相对简单的过程是找到MySQL中会引起此问题的所有表/列,并在传输之前在MySQL中解决它。例如删除非空约束或将空字符串更新为其他内容。

因此找到非空字符串列并测试它是否包含空字符串:

DELIMITER //
CREATE PROCEDURE FindEmptyStringColumns()
BEGIN
    DECLARE done INT DEFAULT FALSE;
    DECLARE tableName CHAR(64);
    DECLARE columnName CHAR(64);
    DECLARE cur CURSOR FOR 
        SELECT TABLE_NAME, COLUMN_NAME 
        FROM INFORMATION_SCHEMA.COLUMNS 
        WHERE TABLE_SCHEMA = DATABASE() 
        AND DATA_TYPE IN ('char', 'varchar', 'text') 
        AND IS_NULLABLE = 'NO';
    DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;

    OPEN cur;

    read_loop: LOOP
        FETCH cur INTO tableName, columnName;

        IF done THEN
            LEAVE read_loop;
        END IF;

        SET @s = CONCAT('SELECT "', tableName, '.', columnName, 
                        '" AS table_column, COUNT(*) AS empty_string_count ',
                        'FROM ', tableName, 
                        ' WHERE TRIM(', columnName, ') = ""');
        PREPARE stmt FROM @s;
        EXECUTE stmt;
        DEALLOCATE PREPARE stmt;
    END LOOP;

    CLOSE cur;
END//
DELIMITER ;

然后:

CALL FindEmptyStringColumns();

当然这只是一个建议,由您决定如何处理检测到的任何列。

© www.soinside.com 2019 - 2024. All rights reserved.