错误:表 'person' 的 PK 模式是 RECORD_KEY,但缺少记录键模式

问题描述 投票:0回答:1

来源:甲骨文数据库

目标:Postgres

使用Kafka复制数据

当我插入时,它工作正常。 当我更新源上的记录时,它会在目标上放置一个新条目,而不是更新已经存在的信息。 当我删除源(oracle)表中的记录时,我没有看到目标上删除的记录

问题:插入工作正常,但更新和删除不工作。请让我知道我应该怎么做才能解决这个问题?我在这里错过了什么?

源连接器:

{
    "name": "source",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:oracle:thin:@192.168.91.139:1521/orcl1",
        "connection.user": "sys as sysdba",
        "connection.password": "oracle",
        "topic.prefix": "person",
        "mode": "incrementing",
        "poll.interval.ms": "1000",
        "incrementing.column.name":"ID",
        "query": "SELECT * from person",
        "numeric.mapping":"none",
        "include.schema.changes": "true",
        "validate.non.null": "false",
        "value.converter.schemas.enable": "true",
       "key.converter":"io.confluent.connect.avro.AvroConverter",
       "key.converter.schema.registry.url":"http://localhost:8081",
       "value.converter":"io.confluent.connect.avro.AvroConverter",
       "value.converter.schema.registry.url":"http://localhost:8081"

    }
}

水槽连接器:

  {
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics":"person",
        "connection.url": "jdbc:postgresql://192.168.91.229:5432/postgres?user=postgres&password=postgres ",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "auto.create": "true",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "delete.enabled": "true",
       "value.converter.schemas.enable": "true",
       "key.converter":"io.confluent.connect.avro.AvroConverter",
       "key.converter.schema.registry.url":"http://localhost:8081",
       "value.converter":"io.confluent.connect.avro.AvroConverter",
       "value.converter.schema.registry.url":"http://localhost:8081"
    }
}
apache-kafka kafka-consumer-api apache-kafka-connect confluent-platform database-replication
1个回答
0
投票

我能想到几件事,证明删除/更新行为是合理的。

you shouldn't have the

transforms
, no need for it. 对于
key.converter
它应该是
StringConverter
而不是
AvroConverter
,您还需要通过设置
pk.fields

来定义 pk 字段

最终您的水槽连接器可能类似于:

        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics":"person",
        "connection.url": "jdbc:postgresql://192.168.91.229:5432/postgres?user=postgres&password=postgres ",
        "auto.create": "true",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "ID",
        "delete.enabled": "true",
  "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "value.converter":"io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url":"http://localhost:8081"

请检查这一点 文章 rmoffat

© www.soinside.com 2019 - 2024. All rights reserved.