我希望在 Spark 结构化流式传输期间将消息映射到具有
schema
和 payload
的结构。
这是我的原始代码
val input_schema = new StructType()
.add("timestamp", DoubleType)
.add("current", DoubleType)
.add("voltage", DoubleType)
.add("temperature", DoubleType)
val df = spark.readStream
.schema(input_schema)
.option("maxFilesPerTrigger", 1)
.parquet("s3a://my-bucket/my-folder/")
.select(to_json(struct("*")).alias("value"))
val query = df.writeStream
.format("kafka")
.option(
"kafka.bootstrap.servers",
"hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
)
.option("topic", "my-topic")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
这将在写入 Kafka 时以这种格式输出消息:
{
"timestamp": 1682556571.14622,
"current": 2.0172032595808242,
"voltage": 19.34080877806074,
"temperature": 37.461518565900434
}
但是,我希望添加一个字段
schema
并将其移动到payload
以便以后我可以通过JDBC Sink Connector(例如Aiven的JDBC Sink and Source Connectors)下沉到TimescaleDB。
因为我将沉入 Postgres 的 TimescaleDB,基于 this doc,我认为我应该使用
"decimal"
作为每个字段类型。
所以这是我希望生成的Kafka消息格式:
{
"schema":{
"type": "struct",
"fields":[
{
"type": "decimal",
"optional": false,
"field": "timestamp"
},
{
"type": "decimal",
"optional": true,
"field": "current"
},
{
"type": "decimal",
"optional": true,
"field": "voltage"
},
{
"type": "decimal",
"optional": true,
"field": "temperature"
}
]
},
"payload":{
"timestamp": 1682556571.14622,
"current": 2.0172032595808242,
"voltage": 19.34080877806074,
"temperature": 37.461518565900434
}
}
我尝试将我的 Spark 代码更新为
val input_schema = new StructType()
.add("timestamp", DoubleType)
.add("current", DoubleType, nullable = true)
.add("voltage", DoubleType, nullable = true)
.add("temperature", DoubleType, nullable = true)
val output_schema = new StructType()
.add("timestamp", "decimal")
.add("current", "decimal", nullable = true)
.add("voltage", "decimal", nullable = true)
.add("temperature", "decimal", nullable = true)
val df = spark.readStream
.schema(input_schema)
.option("maxFilesPerTrigger", 1)
.parquet("s3a://my-bucket/my-folder/")
.select(
to_json(struct("*")).alias("payload")
)
.withColumn(
"schema",
to_json(struct(
lit("struct").alias("type"),
lit(output_schema.fields.map(field => struct(
lit(field.dataType).alias("type"),
lit(field.nullable).alias("optional"),
lit(field.name).alias("field")
))).alias("fields")
))
)
.select(
to_json(struct(
col("schema"),
col("payload")
)).alias("value")
)
val query = df.writeStream
.format("kafka")
.option(
"kafka.bootstrap.servers",
"hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
)
.option("topic", "my-topic")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
但是当我
spark-submit
时,我得到了错误
Exception in thread "main" org.apache.spark.SparkRuntimeException: The feature is not supported: literal for 'DecimalType(10,0)' of class org.apache.spark.sql.types.DecimalType.
at org.apache.spark.sql.errors.QueryExecutionErrors$.literalTypeUnsupportedError(QueryExecutionErrors.scala:296)
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:101)
at org.apache.spark.sql.functions$.lit(functions.scala:125)
at com.hongbomiao.IngestFromS3ToKafka$.$anonfun$main$1(IngestFromS3ToKafka.scala:46)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at com.hongbomiao.IngestFromS3ToKafka$.main(IngestFromS3ToKafka.scala:45)
at com.hongbomiao.IngestFromS3ToKafka.main(IngestFromS3ToKafka.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
我有点感觉
StructType
导致它返回DecimalType(10,0)
。也许在这种情况下我根本不应该使用StructType
?
但我不确定如何在输出消息中准确生成
"decimal"
。任何指南将不胜感激,谢谢!