如何设置接收器 JDBC 连接器的目标表名称?

问题描述 投票:0回答:1

我有一个 JDBC 接收器连接器从 Postgres 源连接器检索数据,并且我需要将数据从“parametros”源表复制到“parametros_sistema”目标表。 我正在使用 org.apache.kafka.connect.transforms.ReplaceField$Value 转换器将特定列从源定位到目标,因为我的列名称在源和目标中不同,但是当我启动接收器时连接器我收到此错误:

错误 [collaborator-sink-postgres-connector|task-0] WorkerSinkTask{id=collaborator-sink-postgres-connector-0} 任务引发了未捕获且不可恢复的异常。任务正在被终止,并且在手动重新启动之前不会恢复。错误:表“collab-service”。“public”。“parametros”丢失并且自动创建被禁用(org.apache.kafka.connect.runtime.WorkerSinkTask:586)

问题是我无法设置目标表名称,因为连接器正在尝试在目标数据库上创建与源表同名的表。有人知道如何解决这个问题吗?

P.S:源和目标都是 Postgres 数据库

我的源连接器:

debezium/debezium-connector-postgresql:2.2.1

name = collaborator-postgres-connector
connector.class = io.debezium.connector.postgresql.PostgresConnector
tasks.max = 1
topic.prefix = collab-service
database.hostname = host
database.server.name = collaborator-postgres-server
database.port = 5432
database.user = user
database.password = password
database.dbname = sflm_dev
plugin.name = pgoutput
slot.name = collab_test
decimal.handling.mode = double
snapshot.mode = always
schema.name.adjustment.mode = none
table.include.list = public.parametros
database.history.kafka.topic = postgres_history
database.history.kafka.bootstrap.servers = kafka:9092
message.prefix.include.list = after

我的水槽连接器:

confluenceinc/kafka-connect-jdbc:10.7.4

name = collaborator-sink-postgres-connector
connector.class = io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max = 1
topics = collab-service.public.parametros
connection.url = host
connection.user = user
connection.password = password
database = collaborator_suite
auto.create = false
insert.mode = upsert
pk.mode = record_key
transforms = timestampConverter,replaceField
transforms.timestampConverter.type = org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.timestampConverter.field = para_dt_cadastro, para_dt_ult_alt
transforms.timestampConverter.target.type = Timestamp
transforms.replaceField.type = org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.replaceField.renames = para_cd_id:pasi_cd_id, para_tx_dominio:pasi_tx_dominio, para_tx_descricao:pasi_tx_descricao, para_tx_valor:pasi_tx_valor, para_tx_tipo:pasi_tx_tipo, para_dt_ult_alt:pasi_dt_ult_alt, para_dt_cadastro:pasi_dt_cadastro, para_dt_cadastro:pasi_dt_cadastro, para_dt_ult_alt:pasi_dt_ult_alt, usua_cd_id_cadastro:usua_cd_id_cadastro, usua_cd_id_ult_alt:usua_cd_id_ult_alt, para_tx_sistema:pasi_tx_sistema
db.timezone = UTC
pk.fields = para_cd_id

我的主题消息之一的示例:

[
    {
        "topic": "collab-service.public.parametros",
        "partition": 0,
        "offset": 1,
        "timestamp": 1704296444971,
        "timestampType": "CREATE_TIME",
        "headers": [],
        "key": "Struct{para_cd_id=2}",
        "value": {
            "schema": {
                "type": "struct",
                "fields": [
                    {
                        "type": "struct",
                        "fields": [
                            {
                                "type": "int32",
                                "optional": false,
                                "default": 0,
                                "field": "para_cd_id"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_dominio"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_descricao"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_valor"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_tipo"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "name": "io.debezium.time.MicroTimestamp",
                                "version": 1,
                                "field": "para_dt_cadastro"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "name": "io.debezium.time.MicroTimestamp",
                                "version": 1,
                                "field": "para_dt_ult_alt"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "field": "usua_cd_id_cadastro"
                            },
                            {
                                "type": "double",
                                "optional": true,
                                "field": "usua_cd_id_ult_alt"
                            },
                            {
                                "type": "string",
                                "optional": true,
                                "field": "para_tx_sistema"
                            }
                        ],
                        "optional": true,
                        "name": "collab-service.public.parametros.Value",
                        "field": "before"
                    },
                    {
                        "type": "struct",
                        "fields": [
                            {
                                "type": "int32",
                                "optional": false,
                                "default": 0,
                                "field": "para_cd_id"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_dominio"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_descricao"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_valor"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_tipo"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "name": "io.debezium.time.MicroTimestamp",
                                "version": 1,
                                "field": "para_dt_cadastro"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "name": "io.debezium.time.MicroTimestamp",
                                "version": 1,
                                "field": "para_dt_ult_alt"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "field": "usua_cd_id_cadastro"
                            },
                            {
                                "type": "double",
                                "optional": true,
                                "field": "usua_cd_id_ult_alt"
                            },
                            {
                                "type": "string",
                                "optional": true,
                                "field": "para_tx_sistema"
                            }
                        ],
                        "optional": true,
                        "name": "collab-service.public.parametros.Value",
                        "field": "after"
                    },
                    {
                        "type": "struct",
                        "fields": [
                            {
                                "type": "string",
                                "optional": false,
                                "field": "version"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "connector"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "name"
                            },
                            {
                                "type": "int64",
                                "optional": false,
                                "field": "ts_ms"
                            },
                            {
                                "type": "string",
                                "optional": true,
                                "name": "io.debezium.data.Enum",
                                "version": 1,
                                "parameters": {
                                    "allowed": "true,last,false,incremental"
                                },
                                "default": "false",
                                "field": "snapshot"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "db"
                            },
                            {
                                "type": "string",
                                "optional": true,
                                "field": "sequence"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "schema"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "table"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "field": "txId"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "field": "lsn"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "field": "xmin"
                            }
                        ],
                        "optional": false,
                        "name": "io.debezium.connector.postgresql.Source",
                        "field": "source"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "op"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_ms"
                    },
                    {
                        "type": "struct",
                        "fields": [
                            {
                                "type": "string",
                                "optional": false,
                                "field": "id"
                            },
                            {
                                "type": "int64",
                                "optional": false,
                                "field": "total_order"
                            },
                            {
                                "type": "int64",
                                "optional": false,
                                "field": "data_collection_order"
                            }
                        ],
                        "optional": true,
                        "name": "event.block",
                        "version": 1,
                        "field": "transaction"
                    }
                ],
                "optional": false,
                "name": "collab-service.public.parametros.Envelope",
                "version": 1
            },
            "payload": {
                "before": null,
                "after": {
                    "para_cd_id": 2,
                    "para_tx_dominio": "SISTEMA_CLOUD_PROJECTID",
                    "para_tx_descricao": "Variável para referenciar o id do projeto na nuvem do Google",
                    "para_tx_valor": "collaborator-364516",
                    "para_tx_tipo": "SISTEMA",
                    "para_dt_cadastro": 1669047531542921,
                    "para_dt_ult_alt": 1669047531542921,
                    "usua_cd_id_cadastro": null,
                    "usua_cd_id_ult_alt": null,
                    "para_tx_sistema": "R2D2"
                },
                "source": {
                    "version": "2.2.1.Final",
                    "connector": "postgresql",
                    "name": "collab-service",
                    "ts_ms": 1704296442190,
                    "snapshot": "last",
                    "db": "sflm_dev",
                    "sequence": "[null,\"17335024878048\"]",
                    "schema": "public",
                    "table": "parametros",
                    "txId": 989004,
                    "lsn": 17335024878048,
                    "xmin": null
                },
                "op": "r",
                "ts_ms": 1704296444402,
                "transaction": null
            }
        }
    }
]
java postgresql apache-kafka apache-kafka-connect confluent-platform
1个回答
0
投票

物业

table.name.format
完成了这项工作。

© www.soinside.com 2019 - 2024. All rights reserved.