如何在没有 json Schema 的情况下使用 SparkStream 从 Kafka 获取 Dataframe

问题描述 投票:0回答:1

我是火花新手

我正在尝试使用 Spark Stream 阅读 kafka 主题。

从 Kafka 流式传输的数据的“value”字段是一个 json 字符串。 我想将此“值”字段转换为数据框 并将其更改为镶木地板文件。

我想从值字段中包含的字符串值获取架构信息。 原因,继续添加JSON数据字段

例如 kafka 数据是这样的。

价值 ...
0 “{a:1, b:2, c:3}..” ...
1 “{a:1,b:2,c:3,d:4}..” ...

我正在尝试这个代码

   source_df = streaming_data.selectExpr("CAST(value AS STRING)").alias("value") \
         .select(from_json("value", schema_of_json(streaming_data.select('value').first().getString(0)))).alias("data") \
         .select("data.*")

我有错误 pyspark.sql.utils.AnalysisException:具有流源的查询必须使用 writeStream.start() 执行;

请帮忙

pyspark spark-streaming
1个回答
0
投票

选项 1:对架构进行硬编码并在

F.from_json()
中使用它。

my_schema = T.StructType([
    T.StructField('a', T.IntegerType()),
    T.StructField('b', T.IntegerType()),
    T.StructField('c', T.IntegerType()),
    T.StructField('d', T.IntegerType()),
])
value = F.col('value').cast(T.StringType())
data = F.from_json(value, my_schema).alias('data')
source_df = streaming_data.select(data).select('data.*')

选项2:如果您想动态推断模式,可以使用

foreachbatch
。但请注意,这是有风险的,破坏架构更改将使流式查询失败。此外,也不能保证能够正确推断模式。

def parse_and_process(df: DataFrame, epoch_id: int) -> None:
    # cache the current micro batch, it will be scanned more than once
    df.persist()

    # infer the schema of the current batch
    spark = SparkSession.getActiveSession()

    value = F.col('value').cast(T.StringType())
    inferred_df = spark.read.json(
        df.select(value).rdd.map(lambda x: x[0]),
        dropFieldIfAllNull=True
    )
    inferred_schema = inferred_df.schema

    # parse the json with the schema
    res_df = df.withColumn('data', F.from_json(value, inferred_schema))

    # process the DataFramee, it's not a streaming DataFrame anymore.
    res_df.write....

    df.unpersist()


streaming_data.writeStream.foreachBatch(parse_and_process).start()
© www.soinside.com 2019 - 2024. All rights reserved.