我已关注https://debezium.io/documentation/reference/2.4/connectors/sqlserver.html#sqlserver-ad-hoc-snapshots
的文档但是当我注册源连接器以通过写入信令表 '{"data- 来在具有 'WHERE' 条件 'last_name'='Walker' 的 'dbo.customers' 表上执行临时快照时集合”:[“dbo.customers”],“类型”:“增量”,“附加条件”:“last_name=Walker”}'。连接器仍然捕获并快照“customers”表中的所有行,而不是我预期的 1 行。
我不知道我错过了哪一步?
这是我的配置步骤:
CREATE DATABASE testDB;
GO
USE testDB;
EXEC sys.sp_cdc_enable_db;
CREATE TABLE customers (
id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE
);
INSERT INTO customers(first_name,last_name,email)
VALUES ('Sally','Thomas','[email protected]');
INSERT INTO customers(first_name,last_name,email)
VALUES ('George','Bailey','[email protected]');
INSERT INTO customers(first_name,last_name,email)
VALUES ('Edward','Walker','[email protected]');
INSERT INTO customers(first_name,last_name,email)
VALUES ('Anne','Kretchmar','[email protected]');
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0;
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
INSERT INTO dbo.debezium_signal (id, type, data)
VALUES ('ad-hoc-1','execute-snapshot','{"data-collections": ["dbo.customers"],"type":"incremental","additional-conditions":"last_name=Walker"}');
{
"name": "customer-adhoc",
"config": {
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max" : "1",
"topic.prefix" : "CDC",
"database.hostname" : "sqlserver12",
"database.port" : "1433",
"database.user" : "sa",
"database.password" : "Password!",
"database.names" : "testDB",
"snapshot.mode": "initial",
"schema.history.internal.kafka.bootstrap.servers" : "kafka12:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"include.schema.changes": "true",
"database.encrypt": "false",
"table.include.list": "dbo.customers,dbo.debezium_signal",
"column.mask.with.0.chars": "testDB.dbo.customers.first_name, testDB.dbo.customers.last_name",
"schema.history.internal.store.only.captured.tables.ddl": "true",
"schema.history.internal.store.only.captured.databases.ddl": "true",
"incremental.snapshot.allow.schema.changes" : "true" ,
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"schema.name.adjustment.mode": "avro",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
"key.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
"key.converter.apicurio.registry.url": "http://****:8080/apis/registry/v2",
"value.converter.apicurio.registry.url": "http://****:8080/apis/registry/v2",
"signal.data.collection": "testDB.dbo.debezium_signal",
"signal.kafka.topic":"CDC.dbz-signal",
"kafka.consumer.offset.commit.enabled": "true",
"signal.kafka.groupId": "customer-kafka-signal",
"signal.kafka.bootstrap.servers": "kafka12:9092"
}
}
Debezium 快照有 2 种类型:
initial
模式。当您重新启动连接器时,它会生成完整快照。这就是为什么你在开始时捕获了所有行。"additional-conditions"
应根据文档。或者你可以先无条件插入,如果有效就加上"additional-conditions"
。