在 Spark 结构化流中对 foreachBatch 操作应用定义的函数时出现 STREAMING_CONNECT_SERIALIZATION_ERROR

问题描述 投票:0回答:1

我正在尝试 Spark 结构化流,但偶然发现了一个问题,但我没有看到问题的根本原因和解决方案。

我定义了一个类

Reader
,其中包含一个函数
read_events
,该函数接收
transactions_df
数据帧作为输入,执行对增量表的读取,并将此读取的输出与传入数据帧连接起来。该函数返回一个数据框。

Reader类的代码,包括转换函数:

from pyspark.sql import DataFrame
from pyspark.sql.functions import col

class Reader():

    def __init__(self, spark):
        self.spark = spark
        self.events_table = "events_table"

    def read_events(self, transactions_df):

        events_df = self.spark.read.table(self.events_table)

        result_df = (
            transactions_df.alias("T")
            .join(
                events_df.alias("E"),
                (col("E.col1") == col("T.col1"))
            .select(
                col("E.col1"),
                col("E.col2"),
            )
        )

        return result_df

独立执行此函数可以按预期工作。下面的语句给出了预期的输出。

input_df = spark.read.table("transactions_table")
Reader(spark).read_events(input_df).display()

但是,当我尝试将此函数合并到 foreachBatch 内的流式查询的微批次输出上时,我收到以下错误消息:

STREAMING_CONNECT_SERIALIZATION_ERROR] Cannot serialize the function `foreachBatch`. If you accessed the Spark session, or a DataFrame defined outside of the function, or any object that contains a Spark session, please be aware that they are not allowed in Spark Connect. For `foreachBatch`, please access the Spark session using `df.sparkSession`, where `df` is the first parameter in your `foreachBatch` function. For `StreamingQueryListener`, please access the Spark session using `self.spark`. For details please check out the PySpark doc for `foreachBatch` and `StreamingQueryListener`.

Streaming类的代码:

from pyspark.sql import DataFrame

class Streaming():

    def __init__(self, spark):
        self.spark = spark
        self.transactions_table= "transactions_table"
        self.transactions_table_stream_checkpoint= "checkpoint_path"
        self.reader= Reader(spark)

    def process_batch_of_messages(self, df, batch_id):
        result_df = self.reader.read_events(df)
        print(f"For batch {batch_id} we have {result_df.count()} records.")

    def launch(self):
        (
            self.spark.readStream.format("delta")
            .option("skipChangeCommits", "true")
            .table(self.transactions_table)
            .writeStream.option("checkpointLocation", self.transactions_table_stream_checkpoint)
            .foreachBatch(
                lambda transactions, batch_id: self.process_batch_of_messages(
                    df=transactions, batch_id=batch_id
                )
            )
            .start()
        )


def entrypoint():
    stream = Streaming(spark)
    stream.launch()


if __name__ == "__main__":
    entrypoint()

任何关于根本原因和可能的解决方案的正确方向的帮助或建议将不胜感激!

  • 代码在 Databricks 笔记本中运行
  • 每个班级都在单独的单元格中
pyspark databricks spark-structured-streaming
1个回答
0
投票

您看到此错误的原因是您将本地 Spark 会话放入 foreachBatch 函数中。当您初始化

Reader
:

时会发生这种情况
self.reader= Reader(spark)

而在Reader的read_events方法中,使用的是:

events_df = self.spark.read.table(self.events_table)

正如错误所述,

If you accessed the Spark session, or a DataFrame defined outside of the function, or any object that contains a Spark session, please be aware that they are not allowed in Spark Connect. For foreachBatch, please access the Spark session using df.sparkSession, where df is the first parameter in your foreachBatch function

您需要避免将本地 Spark 会话放入 foreachBatch 函数,并在

Reader.read_events
中,通过以下方式访问 Spark 会话:

events_df = transactions_df.sparkSession.read.table(self.events_table)

遗憾的是,这是 Spark Connect 引入的一项重大更改,之前可以运行的代码需要进行一些更改才能在 Spark Conenct 中运行。

© www.soinside.com 2019 - 2024. All rights reserved.