Apicurio Registry 的反序列化器出现“artifactId cannot be null”错误

问题描述 投票:0回答:0

我想弄清楚 Apicurio Registry 的反序列化器期望什么 Kafka 记录头格式。

Apicurio 注册表中的我的 Avro 架构

我有一个ID为

hm.motor-value
的神器。

curl --location 'http://apicurio-registry.svc/apis/registry/v2/groups/default/artifacts/hm.motor-value'

会回来

{
    "type": "record",
    "namespace": "com.hongbomiao",
    "name": "motor",
    "fields": [
        {
            "name": "timestamp",
            "type": "long"
        },
        {
            "name": "current",
            "type": "double"
        },
        {
            "name": "voltage",
            "type": "double"
        },
        {
            "name": "temperature",
            "type": "double"
        }
    ]
}

我的 Spark 代码

这是我生成 Avro 格式的 Kafka 记录的 Spark 代码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}
import org.apache.spark.sql.avro.functions.to_avro
import sttp.client3.{HttpClientSyncBackend, UriContext, basicRequest}

object IngestFromS3ToKafka {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("ingest-from-s3-to-kafka")
      .config("spark.ui.port", "4040")
      .getOrCreate()

    val folderPath = "s3a://hongbomiao-bucket/iot/"
    val parquetSchema = new StructType()
      .add("timestamp", DoubleType)
      .add("current", DoubleType, nullable = true)
      .add("voltage", DoubleType, nullable = true)
      .add("temperature", DoubleType, nullable = true)

    val backend = HttpClientSyncBackend()
    val res = basicRequest
      .get(
        uri"http://apicurio-registry.svc:8080/apis/registry/v2/groups/default/artifacts/hm.motor-value"
      )
      .send(backend)
    val kafkaRecordValueSchema = res.body.fold(identity, identity)

    val df = spark.readStream
      .schema(parquetSchema)
      .option("maxFilesPerTrigger", 1)
      .parquet(folderPath)
      .withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
      .select(to_avro(struct("*"), kafkaRecordValueSchema).alias("value"))

    val query = df.writeStream
      .format("kafka")
      .option(
        "kafka.bootstrap.servers",
        "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
      )
      .option("topic", "hm.motor")
      .option("checkpointLocation", "/tmp/checkpoint")
      .start()

    query.awaitTermination()
  }
}

我的 JDBC 连接器配置

{
    "name": "hm-motor-jdbc-sink-kafka-connector",
    "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
    "tasks.max": 10,
    "topics": "hm.motor",
    "connection.url": "jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db",
    "connection.user": "xxx",
    "connection.password": "xxx",
    "insert.mode": "insert",
    "table.name.format": "motor",
    "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
    "value.converter.apicurio.registry.url": "http://apicurio-registry.svc:8080/apis/registry/v2",
    "value.converter.apicurio.registry.fallback.artifact-id": "hm.motor-value"
}

以上代码运行良好。现在我希望摆脱卡夫卡连接器中的

"value.converter.apicurio.registry.fallback.artifact-id": "hm.motor-value"

我的 Kafka 主题名称目前是

hm.motor
,我希望它与工件 ID
hm.motor-value
的模式匹配。

基于this doc

TopicIdStrategy:使用主题名称和

key
value
后缀的默认策略。

但是,现在如果我在我的 JDBC 接收器连接器中删除

"value.converter.apicurio.registry.fallback.artifact-id": "hm.motor-value"
,我会得到错误:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:518)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:495)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:335)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalStateException: artifactId cannot be null
    at io.apicurio.registry.resolver.DefaultSchemaResolver.resolveSchemaByCoordinates(DefaultSchemaResolver.java:177)
    at io.apicurio.registry.resolver.DefaultSchemaResolver.resolveSchemaByArtifactReference(DefaultSchemaResolver.java:172)
    at io.apicurio.registry.serde.AbstractKafkaDeserializer.resolve(AbstractKafkaDeserializer.java:147)
    at io.apicurio.registry.serde.AbstractKafkaDeserializer.readData(AbstractKafkaDeserializer.java:136)
    at io.apicurio.registry.serde.AbstractKafkaDeserializer.deserialize(AbstractKafkaDeserializer.java:131)
    at io.apicurio.registry.utils.converter.SerdeBasedConverter.toConnectData(SerdeBasedConverter.java:139)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:518)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
    ... 14 more

有什么想法吗?谢谢!

scala apache-spark apache-kafka avro apicurio-registry
© www.soinside.com 2019 - 2024. All rights reserved.