如何使用 Debezium SQL Server 源连接器执行临时快照

问题描述 投票:0回答:1

我已关注https://debezium.io/documentation/reference/2.4/connectors/sqlserver.html#sqlserver-ad-hoc-snapshots

的文档

但是当我注册源连接器以通过写入信令表 '{"data- 来在具有 'WHERE' 条件 'last_name'='Walker' 的 'dbo.customers' 表上执行临时快照时集合”:[“dbo.customers”],“类型”:“增量”,“附加条件”:“last_name=Walker”}'。连接器仍然捕获并快照“customers”表中的所有行,而不是我预期的 1 行。

我不知道我错过了哪一步?

这是我的配置步骤:

  1. 填充数据库
CREATE DATABASE testDB;
GO
USE testDB;
EXEC sys.sp_cdc_enable_db;
CREATE TABLE customers (
  id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE
);
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Sally','Thomas','[email protected]');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('George','Bailey','[email protected]');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Edward','Walker','[email protected]');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Anne','Kretchmar','[email protected]');
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0;
  1. 创建信令表并添加信号记录
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
INSERT INTO dbo.debezium_signal (id, type, data) 
VALUES ('ad-hoc-1','execute-snapshot','{"data-collections": ["dbo.customers"],"type":"incremental","additional-conditions":"last_name=Walker"}');
  1. 配置源连接器并启动它。
{
    "name": "customer-adhoc",
    "config": {
        "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
        "tasks.max" : "1",
        "topic.prefix" : "CDC",
        "database.hostname" : "sqlserver12",
        "database.port" : "1433",
        "database.user" : "sa",
        "database.password" : "Password!",
        "database.names" : "testDB",
        "snapshot.mode": "initial",
        "schema.history.internal.kafka.bootstrap.servers" : "kafka12:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory",
        "include.schema.changes": "true",
        "database.encrypt": "false",
        "table.include.list": "dbo.customers,dbo.debezium_signal",
        "column.mask.with.0.chars": "testDB.dbo.customers.first_name, testDB.dbo.customers.last_name",
        "schema.history.internal.store.only.captured.tables.ddl": "true",
        "schema.history.internal.store.only.captured.databases.ddl": "true",
        "incremental.snapshot.allow.schema.changes" : "true" ,
        "key.converter.apicurio.registry.auto-register": "true",
        "key.converter.apicurio.registry.find-latest": "true",
        "value.converter.apicurio.registry.auto-register": "true",
        "value.converter.apicurio.registry.find-latest": "true",
        "schema.name.adjustment.mode": "avro",
        "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "value.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
        "key.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
        "key.converter.apicurio.registry.url": "http://****:8080/apis/registry/v2",
        "value.converter.apicurio.registry.url": "http://****:8080/apis/registry/v2",
        "signal.data.collection": "testDB.dbo.debezium_signal",
        "signal.kafka.topic":"CDC.dbz-signal",
        "kafka.consumer.offset.commit.enabled": "true",
        "signal.kafka.groupId": "customer-kafka-signal",
        "signal.kafka.bootstrap.servers": "kafka12:9092"
    }
}
sql-server snapshot debezium connector adhoc
1个回答
0
投票

Debezium 快照有 2 种类型:

  • 使用 snapshot.mode 参数的初始快照。您有
    initial
    模式。当您重新启动连接器时,它会生成完整快照。这就是为什么你在开始时捕获了所有行。
  • 使用信号表的增量临时快照。每次插入信号时,您都会获得所需的数据集。因此,很容易检查:只需在连接器启动时向信号表插入一个请求即可。如果不起作用,请检查您的请求的格式。例如,
    "additional-conditions"
    应根据文档。或者你可以先无条件插入,如果有效就加上
    "additional-conditions"
© www.soinside.com 2019 - 2024. All rights reserved.