将Spark结构化的流数据帧传递给函数

问题描述 投票:0回答:1

我从源卡夫卡读取了火花结构化的流数据帧。我想将此数据帧传递给该函数,并将该函数的结果写入某个目标。

case class JsonSchema(Column1: StringType,column2: LongType)

case class subJsonSchema(col: String)

def alterTable(rdd: RDD[JsonSchema],spark:SparkSession): DataFrame ={
      spark.createDataFrame(rdd.map(x => subJsonSchema(x.Column1.get)))
    }

import org.apache.spark.sql.Encoders
val jschema = Encoders.product[JsonSchema].schema
val stream = spark
      .readStream
      .format("kafka")
      .options(kafkaParams)
      .load()
val streamingDF = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
      .select(from_json($"value", jschema) as "value").select($"value.*").as[JsonSchema].rdd
alterTable(streamingDF,spark).writeStream.outputMode("append").format("console").start()

投掷

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
apache-spark pyspark spark-streaming spark-structured-streaming apache-spark-dataset
1个回答
0
投票

不支持从structured streaming中的DataFrame到RDD的转换,这意味着以下代码将不会执行。

val streamingDF = 
stream
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
.select(from_json($"value", jschema) as "value")
.select($"value.*")
.as[JsonSchema]
.rdd

注意:如果在spark库中找不到合适的函数,请使用structured streaming函数或编写自己的UDF

仍然要使用RDD功能,请尝试使用foreachBatchforeach接收器。

© www.soinside.com 2019 - 2024. All rights reserved.