来源:甲骨文数据库
目标:Postgres
使用Kafka复制数据
当我插入时,它工作正常。 当我更新源上的记录时,它会在目标上放置一个新条目,而不是更新已经存在的信息。 当我删除源(oracle)表中的记录时,我没有看到目标上删除的记录
问题:插入工作正常,但更新和删除不工作。请让我知道我应该怎么做才能解决这个问题?我在这里错过了什么?
源连接器:
{
"name": "source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@192.168.91.139:1521/orcl1",
"connection.user": "sys as sysdba",
"connection.password": "oracle",
"topic.prefix": "person",
"mode": "incrementing",
"poll.interval.ms": "1000",
"incrementing.column.name":"ID",
"query": "SELECT * from person",
"numeric.mapping":"none",
"include.schema.changes": "true",
"validate.non.null": "false",
"value.converter.schemas.enable": "true",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081"
}
}
水槽连接器:
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics":"person",
"connection.url": "jdbc:postgresql://192.168.91.229:5432/postgres?user=postgres&password=postgres ",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"delete.enabled": "true",
"value.converter.schemas.enable": "true",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081"
}
}
我能想到几件事,证明删除/更新行为是合理的。
you shouldn't have the
transforms
, no need for it.
对于 key.converter
它应该是 StringConverter
而不是 AvroConverter
,您还需要通过设置 pk.fields
来定义 pk 字段
最终您的水槽连接器可能类似于:
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics":"person",
"connection.url": "jdbc:postgresql://192.168.91.229:5432/postgres?user=postgres&password=postgres ",
"auto.create": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "ID",
"delete.enabled": "true",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081"
请检查这一点 文章 rmoffat