Kafka 连接 Jdbc 接收器连接器不自动创建表

问题描述 投票:0回答:1

我正在使用kafka的docker镜像和kafka连接来使用debezium测试cdc,并且数据库是独立的

我的接收器连接器配置 json 看起来像这样,

    {
    "name": "jdbc-sink-test-oracle",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "dialect.name": "OracleDatabaseDialect",
        "table.name.format": "TEST",
        "topics": "oracle-db-source.DBZ_SRC.TEST",
        "connection.url": "jdbc:oracle:thin:@hostname:1521/DB",
        "connection.user": "DBZ_TARGET",
        "connection.password": "DBZ_TARGET",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true",
        "auto.create": "true",
        "insert.mode": "upsert",
        "delete.enabled": "true",
        "pk.fields": "ID",
        "pk.mode": "record_key"
    }
}

我的源连接器配置 json 看起来像这样,


    {
    "name": "test-source-connector",
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",
        "tasks.max" : "1",
        "database.server.name" : "oracle-db-source",
        "database.hostname" : "hostname",
        "database.port" : "1521",
        "database.user" : "clogminer",
        "database.password" : "clogminer",
        "database.dbname" : "DB",
        "database.oracle.version": "19",
        "database.history.kafka.bootstrap.servers" : "kafka:9092",
        "database.history.kafka.topic": "schema-changes.DBZ_SRC",
        "database.connection.adapter": "logminer",
        "table.include.list" : "DBZ_SRC.TEST",
        "database.schema": "DBZ_SRC",
        "errors.log.enable": "true",
        "snapshot.lock.timeout.ms":"5000",
        "include.schema.changes": "true",
        "snapshot.mode":"initial",
        "decimal.handling.mode": "double"
    }
}

我在上述配置中遇到此错误,

    Error : 942, Position : 11, Sql = merge into "TEST" using (select :1  "ID", :2  "NAME", :3  "DESCRIPTION", :4  "WEIGHT" FROM dual) incoming on("TEST"."ID"=incoming."ID") when matched then update set "TEST"."NAME"=incoming."NAME","TEST"."DESCRIPTION"=incoming."DESCRIPTION","TEST"."WEIGHT"=incoming."WEIGHT" when not matched then insert("TEST"."NAME","TEST"."DESCRIPTION","TEST"."WEIGHT","TEST"."ID") values(incoming."NAME",incoming."DESCRIPTION",incoming."WEIGHT",incoming."ID"), OriginalSql = merge into "TEST" using (select ? "ID", ? "NAME", ? "DESCRIPTION", ? "WEIGHT" FROM dual) incoming on("TEST"."ID"=incoming."ID") when matched then update set "TEST"."NAME"=incoming."NAME","TEST"."DESCRIPTION"=incoming."DESCRIPTION","TEST"."WEIGHT"=incoming."WEIGHT" when not matched then insert("TEST"."NAME","TEST"."DESCRIPTION","TEST"."WEIGHT","TEST"."ID") values(incoming."NAME",incoming."DESCRIPTION",incoming."WEIGHT",incoming."ID"), Error Msg = ORA-00942: table or view does not exist


        at io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:150)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:102)
        ... 11 more
2022-07-28 15:01:58,644 ERROR  ||  WorkerSinkTask{id=jdbc-sink-test-oracle-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:610)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:
java.sql.BatchUpdateException: ORA-00942: table or view does not exist

java.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist

我相信根据我给出的配置,需要自动创建该表,但它说表不存在。

但是,它工作正常,并且自动创建名为“TEST2”的表,并且还将数据从源导出到此表以用于此接收器连接器配置

{
    "name": "jdbc-sink-test2-oracle",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "dialect.name": "OracleDatabaseDialect",
        "table.name.format": "TEST2",
        "topics": "oracle-db-source.DBZ_SRC.TEST",
        "connection.url": "jdbc:oracle:thin:@hostname:1521/DB",
        "connection.user": "DBZ_TARGET",
        "connection.password": "DBZ_TARGET",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "auto.create": "true",
        "insert.mode": "upsert",
        "delete.enabled": "true",
        "pk.fields": "ID",
        "pk.mode": "record_key"
    }
}

编辑:

如果已经使用相同的 DDL 创建了与源表同名的目标表,则接收器连接器工作正常,但如果目标表尚不存在,则不会自动创建。

oracle jdbc apache-kafka apache-kafka-connect
1个回答
0
投票

我在 Redpanda、debezium 和 Postgresql 上也遇到了同样的问题。我尝试使用配置 "auto.create": "true" 连接,但它不起作用。

我的代码:

return this.connectorApi.post("/connectors",
        {
            name,
            "config": {
                "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
                "tasks.max": "1",
                "connection.url": `jdbc:postgresql://${host}/${database}?useSSL=true`,
                "connection.username": user,
                "connection.password": password,
                "insert.mode": "update",
                "table.types": "TABLE",
                "pk.mode": "record_value",
                "topics": 'mytopic',
                "auto.create": "true",
                "key.converter": "org.apache.kafka.connect.storage.StringConverter",
                "fields.whitelist": "id, value",
                "auto.evolve": "true",
                "table.name.format": "features",
                "quote.identifiers": "true",
            }
        }
    )
© www.soinside.com 2019 - 2024. All rights reserved.