如何在 Spark 结构化流中正确地将消息映射到具有 `schema` 和 `payload` 的结构?

问题描述 投票:0回答:0

我希望在 Spark 结构化流式传输期间将消息映射到具有

schema
payload
的结构。

这是我的原始代码

val input_schema = new StructType()
  .add("timestamp", DoubleType)
  .add("current", DoubleType)
  .add("voltage", DoubleType)
  .add("temperature", DoubleType)

val df = spark.readStream
  .schema(input_schema)
  .option("maxFilesPerTrigger", 1)
  .parquet("s3a://my-bucket/my-folder/")
  .select(to_json(struct("*")).alias("value"))

val query = df.writeStream
  .format("kafka")
  .option(
    "kafka.bootstrap.servers",
    "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
  )
  .option("topic", "my-topic")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

这将在写入 Kafka 时以这种格式输出消息:

{
  "timestamp": 1682556571.14622,
  "current": 2.0172032595808242,
  "voltage": 19.34080877806074,
  "temperature": 37.461518565900434
}

但是,我希望添加一个字段

schema
并将其移动到
payload
以便以后我可以通过JDBC Sink Connector(例如Aiven的JDBC Sink and Source Connectors)下沉到TimescaleDB。

因为我将沉入 Postgres 的 TimescaleDB,基于 this doc,我认为我应该使用

"decimal"
作为每个字段类型。

所以这是我希望生成的Kafka消息格式:

{
  "schema":{
    "type": "struct",
    "fields":[
      {
        "type": "decimal",
        "optional": false,
        "field": "timestamp"
      },
      {
        "type": "decimal",
        "optional": true,
        "field": "current"
      },
      {
        "type": "decimal",
        "optional": true,
        "field": "voltage"
      },
      {
        "type": "decimal",
        "optional": true,
        "field": "temperature"
      }
    ]
  },
  "payload":{
    "timestamp": 1682556571.14622,
    "current": 2.0172032595808242,
    "voltage": 19.34080877806074,
    "temperature": 37.461518565900434
  }
}

我尝试将我的 Spark 代码更新为

val input_schema = new StructType()
  .add("timestamp", DoubleType)
  .add("current", DoubleType, nullable = true)
  .add("voltage", DoubleType, nullable = true)
  .add("temperature", DoubleType, nullable = true)

val output_schema = new StructType()
  .add("timestamp", "decimal")
  .add("current", "decimal", nullable = true)
  .add("voltage", "decimal", nullable = true)
  .add("temperature", "decimal", nullable = true)

val df = spark.readStream
  .schema(input_schema)
  .option("maxFilesPerTrigger", 1)
  .parquet("s3a://my-bucket/my-folder/")
  .select(
    to_json(struct("*")).alias("payload")
  )
  .withColumn(
    "schema",
    to_json(struct(
      lit("struct").alias("type"),
      lit(output_schema.fields.map(field => struct(
        lit(field.dataType).alias("type"),
        lit(field.nullable).alias("optional"),
        lit(field.name).alias("field")
      ))).alias("fields")
    ))
  )
  .select(
    to_json(struct(
      col("schema"),
      col("payload")
    )).alias("value")
  )

val query = df.writeStream
  .format("kafka")
  .option(
    "kafka.bootstrap.servers",
    "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
  )
  .option("topic", "my-topic")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

但是当我

spark-submit
时,我得到了错误

Exception in thread "main" org.apache.spark.SparkRuntimeException: The feature is not supported: literal for 'DecimalType(10,0)' of class org.apache.spark.sql.types.DecimalType.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.literalTypeUnsupportedError(QueryExecutionErrors.scala:296)
    at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:101)
    at org.apache.spark.sql.functions$.lit(functions.scala:125)
    at com.hongbomiao.IngestFromS3ToKafka$.$anonfun$main$1(IngestFromS3ToKafka.scala:46)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
    at com.hongbomiao.IngestFromS3ToKafka$.main(IngestFromS3ToKafka.scala:45)
    at com.hongbomiao.IngestFromS3ToKafka.main(IngestFromS3ToKafka.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我有点感觉

StructType
导致它返回
DecimalType(10,0)
。也许在这种情况下我根本不应该使用
StructType

但我不确定如何在输出消息中准确生成

"decimal"
。任何指南将不胜感激,谢谢!

scala apache-spark apache-kafka spark-structured-streaming spark-kafka-integration
© www.soinside.com 2019 - 2024. All rights reserved.