周四快乐!我一直在尝试在 Debezium 中创建连接器 Postgres Connector,但只有当表已存在于我的 MySQL 实例中时,我才能捕获更改,这并不理想。因为那样我就必须用 Python 编写一个脚本来处理此类事件,并且使用已经存在的东西可能比重新发明轮子更容易。我希望能够捕获实际连接器中的 DDL。我看到这篇博文。 https://debezium.io/blog/2017/09/25/streaming-to-another-database/ 我让它在我的本地设置上工作,这很棒,但唯一的问题是我想去在相反的方向。 (我能够捕获新记录、删除记录和更新记录,并且如果它们不存在,它也会创建新表和新列)。我想从 postgres 进行流式传输,并将连接器插入到 mysql 中的目标数据库中。我尝试分别切换 jdbc 源和接收器连接器,但我没有将新记录从 postgres 插入到 mysql 中。似乎我可以发现人们到处都可以从 mysql 插入 postgres,但不能从另一个方向插入。这是我为使 mysql-kafka-postgres 正常工作而设置的 GitHub 目录。 https://github.com/debezium/debezium-examples/tree/main/unwrap-smt
我尝试采取不同的方式,但似乎它正在杀死我的 docker 映像,因为我启动时说“无法从 bootstrap.servers 解析服务器 kafka:9092,因为 kafka [org.apache.kafka.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients.clients: ClientUtils]" 这是我的源 json 和我的接收器 json。
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "postgres.public.customers",
"connection.url": "jdbc:mysql://mysql:3306/inventory",
"connection.user": "debezium",
"connection.password": "dbz",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "id",
"pk.mode": "record_key"
}
}
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"topic.prefix": "psql",
"mode": "bulk",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgresuser",
"database.password": "postgrespw",
"database.dbname": "inventory",
"table.include.list": "public.customers",
"slot.name": "test_slot",
"plugin.name": "wal2json",
"database.server.name": "psql",
"tombstones.on.delete": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://[APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT]",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "[SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD]",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://[APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT]",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "[SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD]"
}
}
我关注的博客上的其他所有内容都保持不变。欢迎任何帮助。
我相信这里有两个不同的问题:
如何处理Mysql中不存在的列。 JDBC 接收器连接器应该有一个名为
auto.create
的标志,如果设置为 true
则允许连接器创建不存在的表(auto.evolve
还允许表演化)
PG -> Kafka -> Mysql 是可以的,你可以在这里找到我之前写的一个例子。这些示例使用 Aiven for PostgreSQL 和 Aiven for Apache Kafka,但您应该能够调整连接器以在任何类型的 PG 和 Kafka 中工作。
知道你的 PG->Kafka->MySQL 管道停止工作会很有趣。
免责声明:我为 Aiven 工作
我有一个例子这里。我相信它与发送到mysql时使用的接收器非常相似。