JDBC sink connector如何找到对应的AVRO schema?

问题描述 投票:0回答:1

我在 Apicurio Registry 有一个 AVRO 模式。

我正在尝试使用 Apicurio Registry 的 Confluent 兼容 REST API 端点。目前正在使用 Content ID 6 检索

curl --location 'http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/ccompat/v6/schemas/ids/6' \
  --header 'Content-type: application/json; artifactType=AVRO' \
  --header 'X-Registry-ArtifactId: hm-iot'

打印

{
    "schema": "{\n    \"type\": \"record\",\n    \"namespace\": \"com.hongbomiao\",\n    \"name\": \"motor\",\n    \"fields\": [\n        {\n            \"name\": \"timestamp\",\n            \"type\": \"double\"\n        },\n        {\n            \"name\": \"current\",\n            \"type\": \"double\"\n        },\n        {\n            \"name\": \"voltage\",\n            \"type\": \"double\"\n        },\n        {\n            \"name\": \"temperature\",\n            \"type\": \"double\"\n        }\n    ]\n}",
    "references": []
}

看起来不错。

基于 Aiven 的 JDBC 连接器文档,我写了我的 JDBC sink 连接器配置:

{
    "name": "hm-motor-jdbc-sink-kafka-connector",
    "config": {
        "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 1,
        "topics": "hm.motor",
        "connection.url": "jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db",
        "connection.user": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}",
        "connection.password": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}",

        "insert.mode": "upsert",

        "table.name.format": "motor",

        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/ccompat/v6",

        "transforms": "convertTimestamp",
        "transforms.convertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.convertTimestamp.field": "timestamp",
        "transforms.convertTimestamp.target.type": "Timestamp"
    }
}

但是,我在 Kafka Connect 日志中收到此错误

2023-05-01 19:01:11,291 ERROR [hm-motor-jdbc-sink-kafka-connector|task-0] WorkerSinkTask{id=hm-motor-jdbc-sink-kafka-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-hm-motor-jdbc-sink-kafka-connector-0]
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:518)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:495)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:335)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic hm.motor to Avro: 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:518)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
    ... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id -1330532454
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:253)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaForDeserialize(AbstractKafkaAvroDeserializer.java:372)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:203)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
    ... 18 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: No content with id/hash 'contentId--1330532454' was found.; error code: 40403
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:314)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:853)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:826)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:311)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:433)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaForDeserialize(AbstractKafkaAvroDeserializer.java:361)
    ... 21 more

它试图获取内容 ID

-1330532454
,但显然我没有这个。我的在
6
。 JDBC 是如何寻找对应的 AVRO schema 的?

我不确定它现在如何映射。我以为它会根据这个配置

motor
寻找一个名为
"table.name.format": "motor"
的模式,但事实证明不是。

谢谢!


更新 1

我将我的 AVRO 架构从

motor
更新为
hm.motor

然后对于我的连接器配置,我添加了

value.converter.value.subject.name.strategy

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/ccompat/v6",
"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",

在此之后,我希望连接器能够根据主题名称找到相应的 AVRO 模式。然而,在重新部署连接器后,我仍然得到完全相同的错误。

它仍然试图找到内容 ID

-1330532454
的 AVRO 模式,我不确定它来自哪里。

jdbc apache-kafka apache-kafka-connect avro aiven
1个回答
0
投票

我相信默认模式名称将是主题名称和

-value
-key
的串联,具体取决于您正在解码的消息部分。

因此,在您的情况下,我会尝试使用架构名称

hm.motor-value
.

这个视频中,您可以检查使用 flink 从 json 编码到 avro 时自动生成的模式名称。

免责声明:我为 Aiven 工作,我们应该更新文档以反映这一点

© www.soinside.com 2019 - 2024. All rights reserved.