我的 debezium 和 kafka 集成面临“不支持的源数据类型:STRUCT”错误

问题描述 投票:0回答:1

我在将 debezium 和 kafka 与 Oracle RDBMS 集成时遇到以下错误:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.

Caused by: org.apache.kafka.connect.errors.ConnectException: Unsupported source data type: STRUCT

我使用的是独立的kafka,其属性文件的配置如下:

bootstrap.servers=localhost:9092

value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=kafka_2.12-3.2.3/libs

我的Oracle Source Connector如下:

name=connector-test180
connector.class=io.debezium.connector.oracle.OracleConnector
tasks.max=1
database.server.name=server1
database.hostname=***.**.0.1
database.port=1521
database.user=username
database.password=password
database.dbname=ORCLCDB
database.pdb.name=ORCLPDB1
database.connection.adapter=logminer
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=schema-changes.inventory
table.include.list=DEBEZIUM.CUSTOMER
column.include.list=DEBEZIUM.CUSTOMER.ID,DEBEZIUM.CUSTOMER.CUSTOMER_ID,DEBEZIUM.CUSTOMER.STATUS,DEBEZIUM.CUSTOMER.FIRSTNAME,\
DEBEZIUM.CUSTOMER.MOBILENUMBER,DEBEZIUM.CUSTOMER.FATHERNAME,DEBEZIUM.CUSTOMER.MOTHERNAME,DEBEZIUM.CUSTOMER.CITY,DEBEZIUM.CUSTOMER.COUNTRY
time.precision.mode=connect
transforms=filter,route
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=jsr223.groovy
transforms.filter.condition=value.source.table == 'CUSTOMER'
transforms.filter.topic.regex=server1.DEBEZIUM.*
transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.route.regex=([^.]+)\\.([^.]+)\\.([^.]+)
transforms.route.replacement=$3

Kafka 连接 Sinker 如下:

name=customer-sink139
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=CUSTOMER
connection.url=jdbc:mysql://localhost:3306/dbname
connection.user=user
connection.password=pass
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
auto.create=false
insert.mode=upsert
pk.mode=record_value
errors.tolerance=all
pk.fields=ID
auto.evolve=true

它也有奇怪的行为,直到昨天它都工作得很好,今天不工作,我没有做任何实质性的改变,并且还使用我以前工作正常的备份对其进行了测试。请大家帮忙解决一下。

尚未找到此查询的任何解决方案,在深入研究问题后,我怀疑这都是因为 field0,该字段是我的 oracle 数据库中的 ID 字段,具有此数据类型“ID NOT NULL NUMBER”,我不知道不知道为什么 debezium 将其包含在具有以下参数的结构中:

"type":"struct",
                   "fields":[
                      {
                         "type":"int32",
                         "optional":false,
                         "field":"scale"
                      },
                      {
                         "type":"bytes",
                         "optional":false,
                         "field":"value"
                      }
                   ],
mysql oracle apache-kafka apache-kafka-connect debezium
1个回答
0
投票

我使用了以下参数,它得到了解决

"converters": "numbertolong"
© www.soinside.com 2019 - 2024. All rights reserved.