在 pyspark 流中保存数据帧

问题描述 投票:0回答:1

我希望在流中存储 PySpark DataFrame,针对每个批次对其进行更改,然后使用 foreachBatch 再次保存更新的 DataFrame。实现这一目标的最简单方法是什么。

我正在使用流检查点。

我的代码结构:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.streaming import StreamingQuery

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

checkpoint_dir = "/path/to/checkpoint"

input_df = spark.readStream.format("your_input_source").load()

def process_batch(df, batch_id):
   pass


query = (
    input_df.writeStream
    .foreachBatch(process_batch)
    .outputMode("append")
    .option("checkpointLocation", checkpoint_dir)
    .start()
)

query.awaitTermination()

python pyspark spark-structured-streaming
1个回答
0
投票

foreachBatch
运算符实际上是一种接收器,它允许您在每个微批次末尾将生成的 DataFrame 写入您选择的位置(任何内置连接器本身不支持该位置)。例如,这可能是还没有 Spark 连接器的 OLTP 数据库,或者您可能需要写入多个接收器。这些确实应该是使用
foreachBatch
的唯一原因。

如果您可以使用流式运算符,例如选择/过滤、重复数据删除、窗口聚合或Python中的

任意状态操作
,则不应尝试在foreachBatch中进行数据处理。 (免责声明:该博客来自我工作的 Databricks,但它没有任何 Databricks 特定的内容。)

如果您绝对确定这些解决方案都不适合您,您可以在

process_batch
中转换您的 DataFrame 并将其写入某处。

所以,你的代码最终应该看起来像这样:

query = (
    input_df
    # Apply built-in operators, such as a filter or deduplication
    .filter("foo > 10")
    # Fill out the parameters if you're deduplicating
    .dropDuplicatesWithinWatermark(...)
    .writeStream
    # process_batch should write to a non-built-in sink
    # If the sink is built-in, use .format to specify it
    .foreachBatch(process_batch)
    .outputMode("append")
    .option("checkpointLocation", checkpoint_dir)
    .start()
)
© www.soinside.com 2019 - 2024. All rights reserved.