我从源卡夫卡读取了火花结构化的流数据帧。我想将此数据帧传递给该函数,并将该函数的结果写入某个目标。
case class JsonSchema(Column1: StringType,column2: LongType)
case class subJsonSchema(col: String)
def alterTable(rdd: RDD[JsonSchema],spark:SparkSession): DataFrame ={
spark.createDataFrame(rdd.map(x => subJsonSchema(x.Column1.get)))
}
import org.apache.spark.sql.Encoders
val jschema = Encoders.product[JsonSchema].schema
val stream = spark
.readStream
.format("kafka")
.options(kafkaParams)
.load()
val streamingDF = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
.select(from_json($"value", jschema) as "value").select($"value.*").as[JsonSchema].rdd
alterTable(streamingDF,spark).writeStream.outputMode("append").format("console").start()
投掷
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
不支持从structured streaming
中的DataFrame到RDD的转换,这意味着以下代码将不会执行。
val streamingDF =
stream
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
.select(from_json($"value", jschema) as "value")
.select($"value.*")
.as[JsonSchema]
.rdd
注意:如果在spark库中找不到合适的函数,请使用structured streaming
函数或编写自己的UDF
。
仍然要使用RDD功能,请尝试使用foreachBatch
或foreach
接收器。