基于查询的 JDBC 源连接器 Kafka

问题描述 投票:0回答:3

我有一个遗留数据库,其主键列是字符串(是的,我知道)。我想使用 JDBC kafka Source Connector 进行从 postgres DB 到 kafka 主题的

increment
转储模式

以下是我重现问题的尝试

create table test(
id varchar(20) primary key,
name varchar(10) 
);

INSERT INTO test(
    id, name)
VALUES ('1ab', 't'),
('2ab', 't'),
('3ab', 't')

我的配置

{"name" : "test_connector",
    "config" : {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:postgresql://12.34.5.6:5432/",
        "connection.user": "user",
        "connection.password": "password",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "topic.prefix": "incre_",
        "mode": "incrementing",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "query" :"SELECT cast(replace(id, 'ab','') as integer) as id , name from test ORDER BY id ASC",
        "incrementing.column.name":"id",
        "value.converter.schema.registry.url": "http://schema-registry_url.com",
        "key.converter.schema.registry.url": "http://schema-registry_url.com",
        "offset.flush.timeout.ms": 2000,

    }
}

我发布配置后,当我执行 HTTP curl 时,状态为

RUNNING
。我检查的时候worker的日志里也没有错误日志 当我尝试做控制台消费者时,kafka主题中也没有数据 我还尝试了其他几种组合,例如添加
"table.whitelist": "test"

我尝试的另一件事是关注这两个链接 https://rmoff.net/2018/05/21/kafka-connect-and-oracle-data-types/ https://www.confluence.io/blog/kafka-connect-deep-dive-jdbc-source-connector但没有任何帮助,即使是建议的聪明技巧,如

SELECT * from (SELECT id, name from test where ...)

jdbc apache-kafka apache-kafka-connect
3个回答
2
投票

所以在玩了几个小时不同的配置后。我回到官方文档并意识到这一点

使用自定义查询而不是加载表,允许您连接多个表中的数据。只要查询不包含自己的过滤,您仍然可以使用内置模式进行增量查询(在本例中,使用时间戳列)。请注意,这限制了每个连接器只能有一个输出,并且由于没有表名称,因此在这种情况下主题“前缀”实际上是完整的主题名称。

所以关键是

"topic.prefix": "incre_test"

继续之前的设置,正确的配置应该是

{"name" : "test_connector",
    "config" : {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:postgresql://12.34.5.6:5432/",
        "connection.user": "user",
        "connection.password": "password",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "topic.prefix": "incre_test",
        "mode": "incrementing",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "query" :"SELECT cast(replace(id, 'ab','') as integer) as id , name from test ORDER BY id ASC",
        "incrementing.column.name":"id",
        "value.converter.schema.registry.url": "http://schema-registry_url.com",
        "key.converter.schema.registry.url": "http://schema-registry_url.com",
        "offset.flush.timeout.ms": 2000,

    }
}

0
投票

恐怕您无法在

varchar id
模式下使用
incrementing
,因为它不是递增列/类型。根据Confluence Docs

递增列: 每行包含唯一 ID 的单个列,其中较新的行保证具有更大的 ID,即

AUTOINCREMENT
栏。请注意,此模式只能检测新行。 无法检测到对现有行的更新,因此该模式应该只 用于不可变数据。您可能会使用此模式的一个示例 是在数据仓库中流式传输事实表时,因为这些是 通常仅插入。


0
投票

如何向现有主题发送数据?

© www.soinside.com 2019 - 2024. All rights reserved.