我有多个想要运行的函数,但收到一条错误消息,提示我需要使用
.start()
。有人可以告诉我如何继续吗?
这是我的代码的一部分:
def parse_data(df):
cachedDf = df.cache()
# Do some more logics
return df.collect()
def convert_to_local_time(df):
# Logic for converting to localtime
return df
raw_stream_df = spark.readStream.format("delta").load(raw_table_path)
df = parse_data(raw_stream_df)
df = convert_to_local_time(df)
这是如何正确进行结构化流处理的工作示例。
from pyspark.sql import SparkSession
from pyspark import SQLContext, SparkContext
import pyspark.sql.functions as F
from pyspark.sql.types import *
sc = SparkContext('local')
sqlContext = SQLContext(sc)
data1 = [
[1, 35,"Male","2023-10-01",200],
[1, 35,"Male","2023-10-02",210],
[2, 28,"Female","2023-10-01",150],
[2, 28,"Female","2023-10-02",160],
]
columns =["member_id", "age", "gender", "date", "cost"]
df1 = sqlContext.createDataFrame(data=data1, schema=columns)
df1.show(n=10, truncate=False)
tempdir = "../data/structstream/"
df1.write.format("csv").mode("overwrite").save(tempdir)
dataframeSchema = StructType([
StructField("member_id",IntegerType(),False),
StructField("age",IntegerType(),False),
StructField("gender",StringType(),False),
StructField("date",StringType(),False),
StructField("cost",IntegerType(),False),
])
streaming_df = sqlContext.readStream \
.schema(dataframeSchema) \
.csv(tempdir)
transformed_stream = streaming_df.withColumn("date_casted", F.to_date(F.col("date"))) \
.withColumn("something_here", F.lit("hello_world"))
query = transformed_stream.writeStream \
.outputMode("append") \
.format("console") \
.start()
query.awaitTermination()
输出:
+---------+---+------+----------+----+
|member_id|age|gender|date |cost|
+---------+---+------+----------+----+
|1 |35 |Male |2023-10-01|200 |
|1 |35 |Male |2023-10-02|210 |
|2 |28 |Female|2023-10-01|150 |
|2 |28 |Female|2023-10-02|160 |
+---------+---+------+----------+----+
-------------------------------------------
Batch: 0
-------------------------------------------
+---------+---+------+----------+----+-----------+--------------+
|member_id|age|gender| date|cost|date_casted|something_here|
+---------+---+------+----------+----+-----------+--------------+
| 1| 35| Male|2023-10-01| 200| 2023-10-01| hello_world|
| 1| 35| Male|2023-10-02| 210| 2023-10-02| hello_world|
| 2| 28|Female|2023-10-01| 150| 2023-10-01| hello_world|
| 2| 28|Female|2023-10-02| 160| 2023-10-02| hello_world|
+---------+---+------+----------+----+-----------+--------------+
这并不是使用流数据帧的预期方式。您正在使用
.format("delta").load(...)
正确创建流式 DataFrame,但此时,您确实想要执行 SQL 操作,例如过滤、投影、聚合等。您可以在 here 找到一些示例。
具体来说,您可以执行以下操作,而不是将
raw_stream_df
逐一传递给不同的函数:
# Transform your raw DataFrame to get another DataFrame
df = raw_stream_df.filter("foo > 5").groupBy("bar").count()
# Create a DataStreamWriter by specifying the sink format
data_stream_writer = df.writeStream.format("delta")
# Start your query
data_stream_writer.start()
总的来说,编写查询的基本步骤应该是:
.writeStream
告诉结构化流媒体有关您的接收器的信息.start()
请注意,
.start()
将在后台启动查询,因此即使流运行,该方法也会返回。