我想弄清楚 Apicurio Registry 的反序列化器期望什么 Kafka 记录头格式。
工件ID:
hm.motor-value
curl --location 'http://apicurio-registry.svc/apis/registry/v2/groups/default/artifacts/hm.motor-value'
会回来
{
"type": "record",
"namespace": "com.hongbomiao",
"name": "motor",
"fields": [
{
"name": "timestamp",
"type": "long"
},
{
"name": "current",
"type": "double"
},
{
"name": "voltage",
"type": "double"
},
{
"name": "temperature",
"type": "double"
}
]
}
这是我生成 Avro 格式的 Kafka 记录的 Spark 代码:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}
import org.apache.spark.sql.avro.functions.to_avro
import sttp.client3.{HttpClientSyncBackend, UriContext, basicRequest}
object IngestFromS3ToKafka {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("ingest-from-s3-to-kafka")
.config("spark.ui.port", "4040")
.getOrCreate()
val folderPath = "s3a://hongbomiao-bucket/iot/"
val parquetSchema = new StructType()
.add("timestamp", DoubleType)
.add("current", DoubleType, nullable = true)
.add("voltage", DoubleType, nullable = true)
.add("temperature", DoubleType, nullable = true)
val backend = HttpClientSyncBackend()
val res = basicRequest
.get(
uri"http://apicurio-registry.svc:8080/apis/registry/v2/groups/default/artifacts/hm.motor-value"
)
.send(backend)
val kafkaRecordValueSchema = res.body.fold(identity, identity)
val df = spark.readStream
.schema(parquetSchema)
.option("maxFilesPerTrigger", 1)
.parquet(folderPath)
.withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
.select(to_avro(struct("*"), kafkaRecordValueSchema).alias("value"))
val query = df.writeStream
.format("kafka")
.option(
"kafka.bootstrap.servers",
"hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
)
.option("topic", "hm.motor")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
query.awaitTermination()
}
}
{
"name": "hm-motor-jdbc-sink-kafka-connector",
"connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
"tasks.max": 10,
"topics": "hm.motor",
"connection.url": "jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db",
"connection.user": "xxx",
"connection.password": "xxx",
"insert.mode": "insert",
"table.name.format": "motor",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://apicurio-registry.svc:8080/apis/registry/v2",
"value.converter.apicurio.registry.fallback.artifact-id": "hm.motor-value"
}
以上代码运行良好。现在我希望摆脱卡夫卡连接器中的
"value.converter.apicurio.registry.fallback.artifact-id": "hm.motor-value"
。
我的主题名称目前是
hm.motor
,我希望它与工件ID为hm.motor-value
的模式相匹配。
基于this doc:
TopicIdStrategy:使用主题名称和
或key
后缀的默认策略。value
但是,现在如果我在我的 JDBC 接收器连接器中删除
"value.converter.apicurio.registry.fallback.artifact-id": "hm.motor-value"
,我会得到错误:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:518)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:495)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:335)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalStateException: artifactId cannot be null
at io.apicurio.registry.resolver.DefaultSchemaResolver.resolveSchemaByCoordinates(DefaultSchemaResolver.java:177)
at io.apicurio.registry.resolver.DefaultSchemaResolver.resolveSchemaByArtifactReference(DefaultSchemaResolver.java:172)
at io.apicurio.registry.serde.AbstractKafkaDeserializer.resolve(AbstractKafkaDeserializer.java:147)
at io.apicurio.registry.serde.AbstractKafkaDeserializer.readData(AbstractKafkaDeserializer.java:136)
at io.apicurio.registry.serde.AbstractKafkaDeserializer.deserialize(AbstractKafkaDeserializer.java:131)
at io.apicurio.registry.utils.converter.SerdeBasedConverter.toConnectData(SerdeBasedConverter.java:139)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:518)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
... 14 more
有什么想法吗?谢谢!