我创建了 Debezium 连接器,它的工作原理就像一个魅力:
{
"name": "debezium_title",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "xxx.xxx.xxx",
"database.port": "3306",
"database.user": "kafkaconnect",
"database.password": "***",
"database.server.name": "mysql_debezium",
"heartbeat.interval.ms": 5000,
"snapshot.mode": "when_needed",
"snapshot.new.tables": "parallel",
"database.include.list": "database",
"table.include.list": "database.table1,database.table2",
"database.history.kafka.topic": "mysql_debezium_history",
"database.history.kafka.recovery.poll.interval.ms": 5000,
"database.history.kafka.bootstrap.servers": "xxx.xxx.xxx:9092",
"include.schema.changes": "false",
"transforms": "extractInt",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "id"
}
}
现在,我想删除此连接器并创建一个没有表白名单的新连接器 - 以摄取整个数据库:
{
"name": "debezium_title_new",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "xxx.xxx.xxx",
"database.port": "3306",
"database.user": "kafkaconnect",
"database.password": "***",
"database.server.name": "mysql_debezium_new",
"heartbeat.interval.ms": 5000,
"snapshot.mode": "initial",
"database.include.list": "database",
"database.history.kafka.topic": "debezium_mysql_history_new",
"database.history.kafka.recovery.poll.interval.ms": 5000,
"database.history.kafka.bootstrap.servers": "xxx.xxx.xxx:9092"
}
}
新增:连接器名称、数据库服务器名称和数据库历史kafka主题名称。
奇怪的是,它只为我在
"table.include.list"
中为第一个连接器设置的表创建主题!
kafka-topics --list --zookeeper xxx.xxx:2181
mysql_debezium_new.table1
mysql_debezium_new.table2
一段时间后出现错误:
Encountered change event for table database.table3 whose schema isn't known to this connector
(表 3 是数据库中的表的示例,该表不在第一个连接器的白名单中)
我的数据库中的其余表在哪里? :) 当我已经拥有带有表白名单的连接器时,如何强制 Debezium 从 mysql 移动所有表(删除第一个连接器并创建没有白名单的新连接器没有帮助)。
我还尝试将新表添加到白名单中,但它也不起作用 - 它不会创建新主题,也不会加载整个表 - 它会等待表中的第一个新记录,然后发生错误:架构是不知道这张桌子。
怎么样? Debezium 是否会为第一个连接器拍摄数据库快照,然后将其保存在某处并保留?我该如何重置?
Debezium 版本:1.5
该参数是可选的,如果您希望它获取所有非系统表,请完全省略该参数。
来自文档: 可选的以逗号分隔的正则表达式列表,与您希望 Debezium 捕获的表的完全限定表标识符相匹配;任何未包含在 table.include.list 中的表都将从捕获中排除。每个标识符的形式为 schemaName.tableName。默认情况下,连接器捕获指定架构的所有非系统表。不得与 table.exclude.list 一起使用。
上述 Debezium 文档的链接:https://debezium.io/documentation/reference/connectors/sqlserver.html
@Gignak,以下是我的一些建议:
要删除连接器,请使用:
curl -i -X DELETE <host>:<port>/connectors/debezium_title
要创建新连接器,请使用:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" <host>:<port>/connectors/ --data "<your json path>"
我还建议,当您需要该架构的所有表时,至少指定该架构,这样您就可以将此参数包含到您的 json 中:
"schema.include.list": "<your_schema>"
请记住,目前(2023/05)当您停止或删除连接器时,旧主题不会被删除,因此您应该采取不同的做法。
您只需删除kafka主题并重新运行应用程序即可。会起作用的。