我正在尝试 Spark 结构化流,但偶然发现了一个问题,但我没有看到问题的根本原因和解决方案。
我定义了一个类
Reader
,其中包含一个函数 read_events
,该函数接收 transactions_df
数据帧作为输入,执行对增量表的读取,并将此读取的输出与传入数据帧连接起来。该函数返回一个数据框。
Reader类的代码,包括转换函数:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
class Reader():
def __init__(self, spark):
self.spark = spark
self.events_table = "events_table"
def read_events(self, transactions_df):
events_df = self.spark.read.table(self.events_table)
result_df = (
transactions_df.alias("T")
.join(
events_df.alias("E"),
(col("E.col1") == col("T.col1"))
.select(
col("E.col1"),
col("E.col2"),
)
)
return result_df
独立执行此函数可以按预期工作。下面的语句给出了预期的输出。
input_df = spark.read.table("transactions_table")
Reader(spark).read_events(input_df).display()
但是,当我尝试将此函数合并到 foreachBatch 内的流式查询的微批次输出上时,我收到以下错误消息:
STREAMING_CONNECT_SERIALIZATION_ERROR] Cannot serialize the function `foreachBatch`. If you accessed the Spark session, or a DataFrame defined outside of the function, or any object that contains a Spark session, please be aware that they are not allowed in Spark Connect. For `foreachBatch`, please access the Spark session using `df.sparkSession`, where `df` is the first parameter in your `foreachBatch` function. For `StreamingQueryListener`, please access the Spark session using `self.spark`. For details please check out the PySpark doc for `foreachBatch` and `StreamingQueryListener`.
Streaming类的代码:
from pyspark.sql import DataFrame
class Streaming():
def __init__(self, spark):
self.spark = spark
self.transactions_table= "transactions_table"
self.transactions_table_stream_checkpoint= "checkpoint_path"
self.reader= Reader(spark)
def process_batch_of_messages(self, df, batch_id):
result_df = self.reader.read_events(df)
print(f"For batch {batch_id} we have {result_df.count()} records.")
def launch(self):
(
self.spark.readStream.format("delta")
.option("skipChangeCommits", "true")
.table(self.transactions_table)
.writeStream.option("checkpointLocation", self.transactions_table_stream_checkpoint)
.foreachBatch(
lambda transactions, batch_id: self.process_batch_of_messages(
df=transactions, batch_id=batch_id
)
)
.start()
)
def entrypoint():
stream = Streaming(spark)
stream.launch()
if __name__ == "__main__":
entrypoint()
任何关于根本原因和可能的解决方案的正确方向的帮助或建议将不胜感激!
您看到此错误的原因是您将本地 Spark 会话放入 foreachBatch 函数中。当您初始化
Reader
: 时会发生这种情况
self.reader= Reader(spark)
而在Reader的read_events方法中,使用的是:
events_df = self.spark.read.table(self.events_table)
正如错误所述,
If you accessed the Spark session, or a DataFrame defined outside of the function, or any object that contains a Spark session, please be aware that they are not allowed in Spark Connect. For foreachBatch, please access the Spark session using df.sparkSession, where df is the first parameter in your foreachBatch function
您需要避免将本地 Spark 会话放入 foreachBatch 函数,并在
Reader.read_events
中,通过以下方式访问 Spark 会话:
events_df = transactions_df.sparkSession.read.table(self.events_table)
遗憾的是,这是 Spark Connect 引入的一项重大更改,之前可以运行的代码需要进行一些更改才能在 Spark Conenct 中运行。