如何在 Spark 结构化流中正确地将消息映射到具有 `schema` 和 `payload` 的对象?

问题描述 投票:0回答:1

我希望在 Spark 结构化流式传输期间将消息映射到具有

schema
payload
的对象。

这是我的原始代码

val input_schema = new StructType()
  .add("timestamp", DoubleType)
  .add("current", DoubleType)
  .add("voltage", DoubleType)
  .add("temperature", DoubleType)

val df = spark.readStream
  .schema(input_schema)
  .option("maxFilesPerTrigger", 1)
  .parquet("s3a://my-bucket/my-folder/")
  .select(to_json(struct("*")).alias("value"))

val query = df.writeStream
  .format("kafka")
  .option(
    "kafka.bootstrap.servers",
    "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
  )
  .option("topic", "my-topic")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

这将在写入 Kafka 时以这种格式输出消息:

{
  "timestamp": 1682556571.14622,
  "current": 2.0172032595808242,
  "voltage": 19.34080877806074,
  "temperature": 37.461518565900434
}

但是,我希望添加一个字段

schema
并将其移动到
payload
以便以后我可以通过JDBC Sink Connector(例如Aiven的JDBC Sink and Source Connectors)下沉到TimescaleDB。

因为我将沉入 Postgres 的 TimescaleDB,基于 this doc,我认为我应该使用

"decimal"
作为每个字段类型。

所以这是我希望生成的Kafka消息格式:

{
  "schema":{
    "type": "struct",
    "fields":[
      {
        "type": "decimal",
        "optional": false,
        "field": "timestamp"
      },
      {
        "type": "decimal",
        "optional": true,
        "field": "current"
      },
      {
        "type": "decimal",
        "optional": true,
        "field": "voltage"
      },
      {
        "type": "decimal",
        "optional": true,
        "field": "temperature"
      }
    ]
  },
  "payload":{
    "timestamp": 1682556571.14622,
    "current": 2.0172032595808242,
    "voltage": 19.34080877806074,
    "temperature": 37.461518565900434
  }
}

我尝试将我的 Spark 代码更新为

val input_schema = new StructType()
  .add("timestamp", DoubleType)
  .add("current", DoubleType, nullable = true)
  .add("voltage", DoubleType, nullable = true)
  .add("temperature", DoubleType, nullable = true)

val output_schema = new StructType()
  .add("timestamp", "decimal")
  .add("current", "decimal", nullable = true)
  .add("voltage", "decimal", nullable = true)
  .add("temperature", "decimal", nullable = true)

val df = spark.readStream
  .schema(input_schema)
  .option("maxFilesPerTrigger", 1)
  .parquet("s3a://my-bucket/my-folder/")
  .select(
    to_json(struct("*")).alias("payload")
  )
  .withColumn(
    "schema",
    to_json(struct(
      lit("struct").alias("type"),
      lit(output_schema.fields.map(field => struct(
        lit(field.dataType).alias("type"),
        lit(field.nullable).alias("optional"),
        lit(field.name).alias("field")
      ))).alias("fields")
    ))
  )
  .select(
    to_json(struct(
      col("schema"),
      col("payload")
    )).alias("value")
  )

val query = df.writeStream
  .format("kafka")
  .option(
    "kafka.bootstrap.servers",
    "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
  )
  .option("topic", "my-topic")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

但是当我

spark-submit
时,我得到了错误

Exception in thread "main" org.apache.spark.SparkRuntimeException: The feature is not supported: literal for 'DecimalType(10,0)' of class org.apache.spark.sql.types.DecimalType.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.literalTypeUnsupportedError(QueryExecutionErrors.scala:296)
    at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:101)
    at org.apache.spark.sql.functions$.lit(functions.scala:125)
    at com.hongbomiao.IngestFromS3ToKafka$.$anonfun$main$1(IngestFromS3ToKafka.scala:46)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
    at com.hongbomiao.IngestFromS3ToKafka$.main(IngestFromS3ToKafka.scala:45)
    at com.hongbomiao.IngestFromS3ToKafka.main(IngestFromS3ToKafka.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我有点感觉

StructType
导致它返回
DecimalType(10,0)
。也许在这种情况下我不应该使用
StructType

我不确定如何在输出消息中准确生成

"decimal"
。任何指南将不胜感激,谢谢!

scala apache-spark apache-kafka spark-structured-streaming spark-kafka-integration
1个回答
0
投票

我认为我原来的猜测是正确的。

StructType
是特殊的,不应该在这里使用。使用
"decimal"
与使用
DecimalType(10,0)
相同,这就是它显示该错误的原因。

就我而言,我应该使用简单的

struct
.

这是工作版本:

val input_schema = new StructType()
  .add("timestamp", DoubleType)
  .add("current", DoubleType, nullable = true)
  .add("voltage", DoubleType, nullable = true)
  .add("temperature", DoubleType, nullable = true)

val df = spark.readStream
  .schema(input_schema)
  .option("maxFilesPerTrigger", 1)
  .parquet("s3a://my-bucket/my-folder/")
  .select(
    struct(
      lit("struct").alias("type"),
      array(
        struct(
          lit("decimal").alias("type"),
          lit(false).alias("optional"),
          lit("timestamp").alias("field")
        ),
        struct(
          lit("decimal").alias("type"),
          lit(true).alias("optional"),
          lit("current").alias("field")
        ),
        struct(
          lit("decimal").alias("type"),
          lit(true).alias("optional"),
          lit("voltage").alias("field")
        ),
        struct(
          lit("decimal").alias("type"),
          lit(true).alias("optional"),
          lit("temperature").alias("field")
        )
      ).alias("fields")
    ).alias("schema"),
    struct("*").alias("payload")
  )
  .select(to_json(struct("*")).alias("value"))

val query = df.writeStream
  .format("kafka")
  .option(
    "kafka.bootstrap.servers",
    "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
  )
  .option("topic", "my-topic")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

这将打印格式

{
  "schema":{
    "type": "struct",
    "fields":[
      {
        "type": "decimal",
        "optional": false,
        "field": "timestamp"
      },
      {
        "type": "decimal",
        "optional": true,
        "field": "current"
      },
      {
        "type": "decimal",
        "optional": true,
        "field": "voltage"
      },
      {
        "type": "decimal",
        "optional": true,
        "field": "temperature"
      }
    ]
  },
  "payload":{
    "timestamp": 1682556571.14622,
    "current": 2.0172032595808242,
    "voltage": 19.34080877806074,
    "temperature": 37.461518565900434
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.