Pyspark:AnalysisException:具有流源的查询必须使用 writeStream.start() 执行;

问题描述 投票:0回答:2

我有多个想要运行的函数,但收到一条错误消息,提示我需要使用

.start()
。有人可以告诉我如何继续吗? 这是我的代码的一部分:

def parse_data(df):
    cachedDf = df.cache()
    # Do some more logics 
    return df.collect()

def convert_to_local_time(df):
    # Logic for converting to localtime
    return df
    

raw_stream_df = spark.readStream.format("delta").load(raw_table_path)

df = parse_data(raw_stream_df)
df = convert_to_local_time(df)
pyspark spark-structured-streaming
2个回答
0
投票

这是如何正确进行结构化流处理的工作示例。

from pyspark.sql import SparkSession
from pyspark import SQLContext, SparkContext
import pyspark.sql.functions as F
from pyspark.sql.types import *


sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = [

[1, 35,"Male","2023-10-01",200],
[1, 35,"Male","2023-10-02",210],
[2, 28,"Female","2023-10-01",150],
[2, 28,"Female","2023-10-02",160],

]

columns =["member_id", "age", "gender", "date", "cost"]

df1 = sqlContext.createDataFrame(data=data1, schema=columns)

df1.show(n=10, truncate=False)



tempdir = "../data/structstream/"
df1.write.format("csv").mode("overwrite").save(tempdir)

dataframeSchema = StructType([
                StructField("member_id",IntegerType(),False),
                StructField("age",IntegerType(),False),
                StructField("gender",StringType(),False),
                StructField("date",StringType(),False),
                StructField("cost",IntegerType(),False),

            ])

streaming_df = sqlContext.readStream \
    .schema(dataframeSchema) \
    .csv(tempdir)

transformed_stream = streaming_df.withColumn("date_casted", F.to_date(F.col("date"))) \
                            .withColumn("something_here", F.lit("hello_world"))

query = transformed_stream.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

输出:

+---------+---+------+----------+----+
|member_id|age|gender|date      |cost|
+---------+---+------+----------+----+
|1        |35 |Male  |2023-10-01|200 |
|1        |35 |Male  |2023-10-02|210 |
|2        |28 |Female|2023-10-01|150 |
|2        |28 |Female|2023-10-02|160 |
+---------+---+------+----------+----+

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+---+------+----------+----+-----------+--------------+
|member_id|age|gender|      date|cost|date_casted|something_here|
+---------+---+------+----------+----+-----------+--------------+
|        1| 35|  Male|2023-10-01| 200| 2023-10-01|   hello_world|
|        1| 35|  Male|2023-10-02| 210| 2023-10-02|   hello_world|
|        2| 28|Female|2023-10-01| 150| 2023-10-01|   hello_world|
|        2| 28|Female|2023-10-02| 160| 2023-10-02|   hello_world|
+---------+---+------+----------+----+-----------+--------------+

0
投票

这并不是使用流数据帧的预期方式。您正在使用

.format("delta").load(...)
正确创建流式 DataFrame,但此时,您确实想要执行 SQL 操作,例如过滤、投影、聚合等。您可以在 here 找到一些示例。

具体来说,您可以执行以下操作,而不是将

raw_stream_df
逐一传递给不同的函数:

# Transform your raw DataFrame to get another DataFrame
df = raw_stream_df.filter("foo > 5").groupBy("bar").count()

# Create a DataStreamWriter by specifying the sink format
data_stream_writer = df.writeStream.format("delta")

# Start your query
data_stream_writer.start()

总的来说,编写查询的基本步骤应该是:

  1. 使用源创建 DataFrame(您已经了解了)
  2. 使用结构化流 API 转换您的 DataFrame
  3. 使用
    .writeStream
    告诉结构化流媒体有关您的接收器的信息
  4. 使用
    .start()
  5. 开始查询

请注意,

.start()
将在后台启动查询,因此即使流运行,该方法也会返回。

© www.soinside.com 2019 - 2024. All rights reserved.