我希望在流中存储 PySpark DataFrame,针对每个批次对其进行更改,然后使用 foreachBatch 再次保存更新的 DataFrame。实现这一目标的最简单方法是什么。
我正在使用流检查点。
我的代码结构:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.streaming import StreamingQuery
# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()
checkpoint_dir = "/path/to/checkpoint"
input_df = spark.readStream.format("your_input_source").load()
def process_batch(df, batch_id):
pass
query = (
input_df.writeStream
.foreachBatch(process_batch)
.outputMode("append")
.option("checkpointLocation", checkpoint_dir)
.start()
)
query.awaitTermination()
foreachBatch
运算符实际上是一种接收器,它允许您在每个微批次末尾将生成的 DataFrame 写入您选择的位置(任何内置连接器本身不支持该位置)。例如,这可能是还没有 Spark 连接器的 OLTP 数据库,或者您可能需要写入多个接收器。这些确实应该是使用foreachBatch
的唯一原因。
如果您可以使用流式运算符,例如选择/过滤、重复数据删除、窗口聚合或Python中的
任意状态操作,则不应尝试在
foreachBatch
中进行数据处理。 (免责声明:该博客来自我工作的 Databricks,但它没有任何 Databricks 特定的内容。)
如果您绝对确定这些解决方案都不适合您,您可以在
process_batch
中转换您的 DataFrame 并将其写入某处。
所以,你的代码最终应该看起来像这样:
query = (
input_df
# Apply built-in operators, such as a filter or deduplication
.filter("foo > 10")
# Fill out the parameters if you're deduplicating
.dropDuplicatesWithinWatermark(...)
.writeStream
# process_batch should write to a non-built-in sink
# If the sink is built-in, use .format to specify it
.foreachBatch(process_batch)
.outputMode("append")
.option("checkpointLocation", checkpoint_dir)
.start()
)