我在将 debezium 和 kafka 与 Oracle RDBMS 集成时遇到以下错误:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
Caused by: org.apache.kafka.connect.errors.ConnectException: Unsupported source data type: STRUCT
我使用的是独立的kafka,其属性文件的配置如下:
bootstrap.servers=localhost:9092
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=kafka_2.12-3.2.3/libs
我的Oracle Source Connector如下:
name=connector-test180
connector.class=io.debezium.connector.oracle.OracleConnector
tasks.max=1
database.server.name=server1
database.hostname=***.**.0.1
database.port=1521
database.user=username
database.password=password
database.dbname=ORCLCDB
database.pdb.name=ORCLPDB1
database.connection.adapter=logminer
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=schema-changes.inventory
table.include.list=DEBEZIUM.CUSTOMER
column.include.list=DEBEZIUM.CUSTOMER.ID,DEBEZIUM.CUSTOMER.CUSTOMER_ID,DEBEZIUM.CUSTOMER.STATUS,DEBEZIUM.CUSTOMER.FIRSTNAME,\
DEBEZIUM.CUSTOMER.MOBILENUMBER,DEBEZIUM.CUSTOMER.FATHERNAME,DEBEZIUM.CUSTOMER.MOTHERNAME,DEBEZIUM.CUSTOMER.CITY,DEBEZIUM.CUSTOMER.COUNTRY
time.precision.mode=connect
transforms=filter,route
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=jsr223.groovy
transforms.filter.condition=value.source.table == 'CUSTOMER'
transforms.filter.topic.regex=server1.DEBEZIUM.*
transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.route.regex=([^.]+)\\.([^.]+)\\.([^.]+)
transforms.route.replacement=$3
Kafka 连接 Sinker 如下:
name=customer-sink139
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=CUSTOMER
connection.url=jdbc:mysql://localhost:3306/dbname
connection.user=user
connection.password=pass
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
auto.create=false
insert.mode=upsert
pk.mode=record_value
errors.tolerance=all
pk.fields=ID
auto.evolve=true
它也有奇怪的行为,直到昨天它都工作得很好,今天不工作,我没有做任何实质性的改变,并且还使用我以前工作正常的备份对其进行了测试。请大家帮忙解决一下。
尚未找到此查询的任何解决方案,在深入研究问题后,我怀疑这都是因为 field0,该字段是我的 oracle 数据库中的 ID 字段,具有此数据类型“ID NOT NULL NUMBER”,我不知道不知道为什么 debezium 将其包含在具有以下参数的结构中:
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"scale"
},
{
"type":"bytes",
"optional":false,
"field":"value"
}
],
我使用了以下参数,它得到了解决
"converters": "numbertolong"