如何正确使用 Spark -> Kafka -> JDBC sink connector with Avro?

问题描述 投票:0回答:3

我有一个简单的 Spark 应用程序生成 Kafka 消息

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.avro.functions.to_avro
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}

object IngestFromS3ToKafka {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("ingest-from-s3-to-kafka")
      .config("spark.ui.port", "4040")
      .getOrCreate()

    val folderPath = "s3a://hongbomiao-bucket/iot/"

    val parquet_schema = new StructType()
      .add("timestamp", DoubleType)
      .add("current", DoubleType, nullable = true)
      .add("voltage", DoubleType, nullable = true)
      .add("temperature", DoubleType, nullable = true)

    val df = spark.readStream
      .schema(parquet_schema)
      .option("maxFilesPerTrigger", 1)
      .parquet(folderPath)
      .withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
      .select(to_avro(struct("*")).alias("value"))

    val query = df.writeStream
      .format("kafka")
      .option(
        "kafka.bootstrap.servers",
        "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
      )
      .option("topic", "hm.motor")
      .option("checkpointLocation", "/tmp/checkpoint")
      .start()

    query.awaitTermination()
  }
}

我在 Apicurio 注册表中有一个 Avro 模式,由

创建
curl --location 'http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/registry/v2/groups/hm-group/artifacts' \
--header 'Content-type: application/json; artifactType=AVRO' \
--header 'X-Registry-ArtifactId: hm-iot' \
--data '{
    "type": "record",
    "namespace": "com.hongbomiao",
    "name": "hm.motor",
    "fields": [
        {
            "name": "timestamp",
            "type": "long"
        },
        {
            "name": "current",
            "type": "double"
        },
        {
            "name": "voltage",
            "type": "double"
        },
        {
            "name": "temperature",
            "type": "double"
        }
    ]
}'

我正在尝试使用 Apicurio Registry 的 Confluent 兼容 REST API 端点。目前正在使用 Content ID 26 检索

curl --location 'http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/ccompat/v6/schemas/ids/26' \
  --header 'Content-type: application/json; artifactType=AVRO' \
  --header 'X-Registry-ArtifactId: hm-iot'

打印

{
    "schema": "{\n    \"type\": \"record\",\n    \"namespace\": \"com.hongbomiao\",\n    \"name\": \"hm.motor\",\n    \"fields\": [\n        {\n            \"name\": \"timestamp\",\n            \"type\": \"long\"\n        },\n        {\n            \"name\": \"current\",\n            \"type\": \"double\"\n        },\n        {\n            \"name\": \"voltage\",\n            \"type\": \"double\"\n        },\n        {\n            \"name\": \"temperature\",\n            \"type\": \"double\"\n        }\n    ]\n}",
    "references": []
}

看起来不错。

基于 Aiven 的 JDBC 连接器文档,我写了我的 JDBC sink 连接器配置:

{
    "name": "hm-motor-jdbc-sink-kafka-connector",
    "config": {
        "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 1,
        "topics": "hm.motor",
        "connection.url": "jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db",
        "connection.user": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}",
        "connection.password": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}",

        "insert.mode": "upsert",

        "table.name.format": "motor",

        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/ccompat/v6",

        "transforms": "convertTimestamp",
        "transforms.convertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.convertTimestamp.field": "timestamp",
        "transforms.convertTimestamp.target.type": "Timestamp"
    }
}

但是,我在 Kafka Connect 日志中收到此错误

2023-05-01 19:01:11,291 ERROR [hm-motor-jdbc-sink-kafka-connector|task-0] WorkerSinkTask{id=hm-motor-jdbc-sink-kafka-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-hm-motor-jdbc-sink-kafka-connector-0]
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:518)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:495)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:335)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic hm.motor to Avro: 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:518)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
    ... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id -1330532454
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:253)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaForDeserialize(AbstractKafkaAvroDeserializer.java:372)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:203)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
    ... 18 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: No content with id/hash 'contentId--1330532454' was found.; error code: 40403
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:314)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:853)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:826)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:311)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:433)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaForDeserialize(AbstractKafkaAvroDeserializer.java:361)
    ... 21 more

它试图获取内容 ID

-1330532454
,但显然我没有这个。我的在
26
。 JDBC 是如何寻找对应的 AVRO schema 的?

我不确定它现在如何映射。我以为它会根据 Kafka 主题寻找一个名为

hm.motor
的模式,但事实证明不是。

谢谢!


更新 1

谢谢@Ftisiot!

我找到了关于Kafka序列化器和反序列化器的文档

Kafka 序列化器和反序列化器在注册或检索模式时默认使用

<topicName>-key
<topicName>-value
作为相应的主题名称。

同样对于

value.converter.value.subject.name.strategy
,它默认使用
io.confluent.kafka.serializers.subject.TopicNameStrategy

我已经将我的 Avro 模式名称更新为

hm.motor-value
,但仍然有同样的错误。

apache-spark apache-kafka apache-kafka-connect avro aiven
3个回答
0
投票

我相信默认模式名称将是主题名称和

-value
-key
的串联,具体取决于您正在解码的消息部分。

因此,在您的情况下,我会尝试使用架构名称

hm.motor-value
.

这个视频中,您可以检查使用 flink 从 json 编码到 avro 时自动生成的模式名称。

免责声明:我为 Aiven 工作,我们应该更新文档以反映这一点


0
投票

暂时忘掉连接。您应该先使用

kafka-avro-console-consumer
调试您的主题。你会在那里得到同样的错误,因为你的生产者需要正确编码数据。

Spark 的

to_avro
不会这样做。

查看这个库的

toConfluentAvro
功能-https://github.com/AbsaOSS/ABRiS

有关内部结构的更多详细信息https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format

关于您的架构问题,

name
指的是 Avro 规范定义的完全限定的 Java 类名,并且在使用 TopicNameStategy 时与注册表主题无关

这个科目名称是什么

它是 API 调用中的路径参数

POST /subjects/:name/versions/
由 Serializer 和 Deserializer 内部 HTTP 客户端使用。

前面也提到过,这里不需要Kafka Connect。 Spark 可以直接写入 JDBC 数据库。数据源可以是Parquet或者Kafka


0
投票

感谢大家的帮助,我终于明白了!这是我学到的。

1。生成 Avro 格式的 Kafka 消息

Avro数据实际上有两种主要类型:

  • “标准”/“香草”Apache Avro
  • 融合 Avro

1.1 [成功] 在 Spark 中生成“标准”/“香草”Apache Avro 数据

首先,我通过

生成了我的 Varo 模式
curl --location 'http://apicurio-registry.svc:8080/apis/registry/v2/groups/default/artifacts' \
--header 'Content-type: application/json; artifactType=AVRO' \
--header 'X-Registry-ArtifactId: hm.motor-value' \
--data '{
    "type": "record",
    "namespace": "com.hongbomiao",
    "name": "motor",
    "fields": [
        {
            "name": "timestamp",
            "type": "long"
        },
        {
            "name": "current",
            "type": "double"
        },
        {
            "name": "voltage",
            "type": "double"
        },
        {
            "name": "temperature",
            "type": "double"
        }
    ]
}'

在 Spark 中,与原生一起使用非常简单

org.apache.spark.sql.avro.functions.to_avro
.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}
import org.apache.spark.sql.avro.functions.to_avro
import sttp.client3.{HttpClientSyncBackend, UriContext, basicRequest}

object IngestFromS3ToKafka {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("ingest-from-s3-to-kafka")
      .config("spark.ui.port", "4040")
      .getOrCreate()

    val folderPath = "s3a://hongbomiao-bucket/iot/"

    # For below `parquet_schema`, you can
    #  1. hard code like current code
    #  2. read from one file `val parquet_schema = spark.read.parquet("s3a://hongbomiao-bucket/iot/motor.parquet").schema`
    #  3. Maybe possible also from Avro, I will try in future!
    val parquet_schema = new StructType()
      .add("timestamp", DoubleType)
      .add("current", DoubleType, nullable = true)
      .add("voltage", DoubleType, nullable = true)
      .add("temperature", DoubleType, nullable = true)

    val backend = HttpClientSyncBackend()
    val response = basicRequest
      .get(
        uri"http://apicurio-registry.svc:8080/apis/registry/v2/groups/hm-group/artifacts/hm.motor-value"
      )
      .send(backend)
    val schemaString = response.body.fold(identity, identity)

    val df = spark.readStream
      .schema(parquet_schema)
      .option("maxFilesPerTrigger", 1)
      .parquet(folderPath)
      .withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
      .select(to_avro(struct("*"), schemaString).alias("value"))

    val query = df.writeStream
      .format("kafka")
      .option(
        "kafka.bootstrap.servers",
        "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
      )
      .option("topic", "hm.motor")
      .option("checkpointLocation", "/tmp/checkpoint")
      .start()

    query.awaitTermination()
  }
}

built.sbt

name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-sql" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-avro" % "3.3.2" % "provided",
  "org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
  "org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
  "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.461" % "provided",
  "com.softwaremill.sttp.client3" %% "core" % "3.8.15"
)

我从这篇文章中得到了很多想法.

1.2 在 Spark 中生成 Confluent Avro 数据

Confluent Avro 不是“标准”/“香草”Avro,这给 Spark 和其他工具带来了一些不便。

有一个库 ABRiS 可以帮助生成 Confluent Avro 格式的 Kafka 消息 (

toConfluentAvro
)。

然而,

sbt assembly
对于ABRiS来说又是一场噩梦。我不得不处理 assemblyMergeStrategy。 🥲

(这个方向我没有更进一步)

2。在 JDBC Kafka 连接器中读取 Avro 格式的 Kafka 消息并下沉到数据库

2.1 [成功] “Standard” / “vanilla” Apache Avro 中的 Kafka 消息

非常简单,只需使用

io.apicurio.registry.utils.converter.AvroConverter
.

我的 JDBC 连接器配置:

{
    "name": "hm-motor-jdbc-sink-kafka-connector",
    "config": {
        "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 1,
        "topics": "hm.motor",
        "connection.url": "jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db",
        "connection.user": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}",
        "connection.password": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}",
        "insert.mode": "upsert",
        "table.name.format": "motor",
        "transforms": "convertTimestamp",
        "transforms.convertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.convertTimestamp.field": "timestamp",
        "transforms.convertTimestamp.target.type": "Timestamp",

        "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "value.converter.apicurio.registry.url": "http://apicurio-registry.svc:8080/apis/registry/v2"
        "value.converter.apicurio.registry.fallback.group-id": "hm-group",
        "value.converter.apicurio.registry.fallback.artifact-id": "hm.motor-value"
    }
}

也许将来我能想出摆脱

value.converter.apicurio.registry.fallback
相关领域的方法。

有关

io.apicurio.registry.utils.converter.AvroConverter
的更多信息可以在这里找到。

2.2 Confluent Avro 中的 Kafka 消息

2.2.1 使用
io.confluent.connect.avro.AvroConverter
与 Apicurio Registry

这里我们使用 Apicurio Registry 的 Confluent 兼容 REST API:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://apicurio-registry.svc:8080/apis/ccompat/v6",

(这个方向我没有更进一步)

2.2.2 使用
io.apicurio.registry.utils.converter.AvroConverter
和 Confluent Schema Registry

这里我们使用Confluent Registry REST API:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://confluent-schema-registry.svc:8081",

(这个方向我没有更进一步)

© www.soinside.com 2019 - 2024. All rights reserved.