kafka接收器jdbc postgres连接器中出现错误(接收器连接器配置为'delete.enabled=false'和'pk.mode=none')

问题描述 投票:0回答:1

使用 jdbc postgres 接收器连接器在 Kafka 连接中出现以下错误。

    org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'sink-postgres-json-distributed' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='Json_test',partition=0,offset=0,timestamp=1695833816061) with a HashMap value and null value schema.
        
    

连接器配置如下:

    connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
    connection.password=<>
    tasks.max=1
    topics=Json_test
    "transforms.flatten.type"="org.apache.kafka.connect.transforms.Flatten$Value"
    "transforms"="flatten"
    key.converter.schemas.enable=false
    auto.evolve=true
    connection.user=<>
    value.converter.schemas.enable=false
    auto.create=true
    value.converter=org.apache.kafka.connect.json.JsonConverter
    connection.url=<>
    insert.mode=upsert
    "transforms.flatten.delimiter"="_"
    key.converter=org.apache.kafka.connect.storage.StringConverter
    pk.mode=none
    pk.fields=__connect_topic,__connect_partition,__connect_offset
    

源数据如下: 核心价值 1981 {"名称":"生产491","时间":1695836275352}

使用 POJO 生成源值序列化器。

json jdbc apache-kafka apache-kafka-connect
1个回答
0
投票

JDBC Sink 不接受纯 JSON。它需要一个

schema
,并且您无法设置
value.converter.schemas.enable=false

否则,您应该将生成的数据更改为始终具有架构的二进制格式,例如 Avro / Protobuf。

https://www.confluence.io/blog/kafka-connect-deep-dive-converters-serialization-explained/

© www.soinside.com 2019 - 2024. All rights reserved.