如何使用postgres JDBC接收器连接器流式传输到mysql连接器

问题描述 投票:0回答:2

周四快乐!我一直在尝试在 Debezium 中创建连接器 Postgres Connector,但只有当表已存在于我的 MySQL 实例中时,我才能捕获更改,这并不理想。因为那样我就必须用 Python 编写一个脚本来处理此类事件,并且使用已经存在的东西可能比重新发明轮子更容易。我希望能够捕获实际连接器中的 DDL。我看到这篇博文。 https://debezium.io/blog/2017/09/25/streaming-to-another-database/ 我让它在我的本地设置上工作,这很棒,但唯一的问题是我想去在相反的方向。 (我能够捕获新记录、删除记录和更新记录,并且如果它们不存在,它也会创建新表和新列)。我想从 postgres 进行流式传输,并将连接器插入到 mysql 中的目标数据库中。我尝试分别切换 jdbc 源和接收器连接器,但我没有将新记录从 postgres 插入到 mysql 中。似乎我可以发现人们到处都可以从 mysql 插入 postgres,但不能从另一个方向插入。这是我为使 mysql-kafka-postgres 正常工作而设置的 GitHub 目录。 https://github.com/debezium/debezium-examples/tree/main/unwrap-smt

我尝试采取不同的方式,但似乎它正在杀死我的 docker 映像,因为我启动时说“无法从 bootstrap.servers 解析服务器 kafka:9092,因为 kafka [org.apache.kafka.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients: ClientUtils]" 这是我的源 json 和我的接收器 json。

 {
"name": "jdbc-sink",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "postgres.public.customers",
    "connection.url": "jdbc:mysql://mysql:3306/inventory",
    "connection.user": "debezium",
    "connection.password": "dbz",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "auto.create": "true",
    "auto.evolve": "true",
    "insert.mode": "upsert",
    "delete.enabled": "true",
    "pk.fields": "id",
    "pk.mode": "record_key"
}
}


{
"name": "inventory-connector",
"config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "topic.prefix": "psql",
    "mode": "bulk",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgresuser",
    "database.password": "postgrespw",
    "database.dbname": "inventory",
    "table.include.list": "public.customers",
    "slot.name": "test_slot",
    "plugin.name": "wal2json",
    "database.server.name": "psql",
    "tombstones.on.delete": "true",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "https://[APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT]",
    "key.converter.basic.auth.credentials.source": "USER_INFO",
    "key.converter.schema.registry.basic.auth.user.info": "[SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD]",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "https://[APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT]",
    "value.converter.basic.auth.credentials.source": "USER_INFO",
    "value.converter.schema.registry.basic.auth.user.info": "[SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD]"
}
}

我关注的博客上的其他所有内容都保持不变。欢迎任何帮助。

mysql postgresql apache-kafka apache-kafka-connect debezium
2个回答
1
投票

我相信这里有两个不同的问题:

  1. 如何处理Mysql中不存在的列。 JDBC 接收器连接器应该有一个名为

    auto.create
    的标志,如果设置为
    true
    则允许连接器创建不存在的表(
    auto.evolve
    还允许表演化)

  2. PG -> Kafka -> Mysql 是可以的,你可以在这里找到我之前写的一个例子。这些示例使用 Aiven for PostgreSQL 和 Aiven for Apache Kafka,但您应该能够调整连接器以在任何类型的 PG 和 Kafka 中工作。

知道你的 PG->Kafka->MySQL 管道停止工作会很有趣。

免责声明:我为 Aiven 工作


0
投票

我有一个例子这里。我相信它与发送到mysql时使用的接收器非常相似。

© www.soinside.com 2019 - 2024. All rights reserved.