使用 jdbc postgres 接收器连接器在 Kafka 连接中出现以下错误。
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'sink-postgres-json-distributed' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='Json_test',partition=0,offset=0,timestamp=1695833816061) with a HashMap value and null value schema.
连接器配置如下:
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=<>
tasks.max=1
topics=Json_test
"transforms.flatten.type"="org.apache.kafka.connect.transforms.Flatten$Value"
"transforms"="flatten"
key.converter.schemas.enable=false
auto.evolve=true
connection.user=<>
value.converter.schemas.enable=false
auto.create=true
value.converter=org.apache.kafka.connect.json.JsonConverter
connection.url=<>
insert.mode=upsert
"transforms.flatten.delimiter"="_"
key.converter=org.apache.kafka.connect.storage.StringConverter
pk.mode=none
pk.fields=__connect_topic,__connect_partition,__connect_offset
源数据如下: 核心价值 1981 {"名称":"生产491","时间":1695836275352}
使用 POJO 生成源值序列化器。
JDBC Sink 不接受纯 JSON。它需要一个
schema
,并且您无法设置 value.converter.schemas.enable=false
否则,您应该将生成的数据更改为始终具有架构的二进制格式,例如 Avro / Protobuf。
https://www.confluence.io/blog/kafka-connect-deep-dive-converters-serialization-explained/