我是火花新手
我正在尝试使用 Spark Stream 阅读 kafka 主题。
从 Kafka 流式传输的数据的“value”字段是一个 json 字符串。 我想将此“值”字段转换为数据框 并将其更改为镶木地板文件。
我想从值字段中包含的字符串值获取架构信息。 原因,继续添加JSON数据字段
例如 kafka 数据是这样的。
键 | 价值 | ... |
---|---|---|
0 | “{a:1, b:2, c:3}..” | ... |
1 | “{a:1,b:2,c:3,d:4}..” | ... |
我正在尝试这个代码
source_df = streaming_data.selectExpr("CAST(value AS STRING)").alias("value") \
.select(from_json("value", schema_of_json(streaming_data.select('value').first().getString(0)))).alias("data") \
.select("data.*")
我有错误 pyspark.sql.utils.AnalysisException:具有流源的查询必须使用 writeStream.start() 执行;
请帮忙
选项 1:对架构进行硬编码并在
F.from_json()
中使用它。
my_schema = T.StructType([
T.StructField('a', T.IntegerType()),
T.StructField('b', T.IntegerType()),
T.StructField('c', T.IntegerType()),
T.StructField('d', T.IntegerType()),
])
value = F.col('value').cast(T.StringType())
data = F.from_json(value, my_schema).alias('data')
source_df = streaming_data.select(data).select('data.*')
foreachbatch
。但请注意,这是有风险的,破坏架构更改将使流式查询失败。此外,也不能保证能够正确推断模式。
def parse_and_process(df: DataFrame, epoch_id: int) -> None:
# cache the current micro batch, it will be scanned more than once
df.persist()
# infer the schema of the current batch
spark = SparkSession.getActiveSession()
value = F.col('value').cast(T.StringType())
inferred_df = spark.read.json(
df.select(value).rdd.map(lambda x: x[0]),
dropFieldIfAllNull=True
)
inferred_schema = inferred_df.schema
# parse the json with the schema
res_df = df.withColumn('data', F.from_json(value, inferred_schema))
# process the DataFramee, it's not a streaming DataFrame anymore.
res_df.write....
df.unpersist()
streaming_data.writeStream.foreachBatch(parse_and_process).start()