我有一个遗留数据库,其主键列是字符串(是的,我知道)。我想使用 JDBC kafka Source Connector 进行从 postgres DB 到 kafka 主题的
increment
转储模式
以下是我重现问题的尝试
create table test(
id varchar(20) primary key,
name varchar(10)
);
INSERT INTO test(
id, name)
VALUES ('1ab', 't'),
('2ab', 't'),
('3ab', 't')
我的配置
{"name" : "test_connector",
"config" : {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://12.34.5.6:5432/",
"connection.user": "user",
"connection.password": "password",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"topic.prefix": "incre_",
"mode": "incrementing",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"query" :"SELECT cast(replace(id, 'ab','') as integer) as id , name from test ORDER BY id ASC",
"incrementing.column.name":"id",
"value.converter.schema.registry.url": "http://schema-registry_url.com",
"key.converter.schema.registry.url": "http://schema-registry_url.com",
"offset.flush.timeout.ms": 2000,
}
}
我发布配置后,当我执行 HTTP curl 时,状态为
RUNNING
。我检查的时候worker的日志里也没有错误日志
当我尝试做控制台消费者时,kafka主题中也没有数据
我还尝试了其他几种组合,例如添加"table.whitelist": "test"
。
我尝试的另一件事是关注这两个链接 https://rmoff.net/2018/05/21/kafka-connect-and-oracle-data-types/ https://www.confluence.io/blog/kafka-connect-deep-dive-jdbc-source-connector但没有任何帮助,即使是建议的聪明技巧,如
SELECT * from (SELECT id, name from test where ...)
所以在玩了几个小时不同的配置后。我回到官方文档并意识到这一点
使用自定义查询而不是加载表,允许您连接多个表中的数据。只要查询不包含自己的过滤,您仍然可以使用内置模式进行增量查询(在本例中,使用时间戳列)。请注意,这限制了每个连接器只能有一个输出,并且由于没有表名称,因此在这种情况下主题“前缀”实际上是完整的主题名称。
所以关键是
"topic.prefix": "incre_test"
继续之前的设置,正确的配置应该是
{"name" : "test_connector",
"config" : {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://12.34.5.6:5432/",
"connection.user": "user",
"connection.password": "password",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"topic.prefix": "incre_test",
"mode": "incrementing",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"query" :"SELECT cast(replace(id, 'ab','') as integer) as id , name from test ORDER BY id ASC",
"incrementing.column.name":"id",
"value.converter.schema.registry.url": "http://schema-registry_url.com",
"key.converter.schema.registry.url": "http://schema-registry_url.com",
"offset.flush.timeout.ms": 2000,
}
}
恐怕您无法在
varchar id
模式下使用 incrementing
,因为它不是递增列/类型。根据Confluence Docs,
递增列: 每行包含唯一 ID 的单个列,其中较新的行保证具有更大的 ID,即
栏。请注意,此模式只能检测新行。 无法检测到对现有行的更新,因此该模式应该只 用于不可变数据。您可能会使用此模式的一个示例 是在数据仓库中流式传输事实表时,因为这些是 通常仅插入。AUTOINCREMENT
如何向现有主题发送数据?