如何使用pyspark filestream读取最近一小时内上传的新文件?

问题描述 投票:0回答:1

我正在尝试读取目录中可用的最新文件(例如过去一小时内的新文件)并加载该数据。我正在尝试使用 pyspark 结构化流。我已经尝试了 Spark Streaming 的 maxFileAge 选项,但它仍然加载目录中的所有文件,无论选项中指定的时间如何。

spark.readStream\
.option("maxFileAge", "1h")\
.schema(cust_schema)\
    .csv(upload_path) \
    .withColumn("closing_date", get_date_udf_func(input_file_name()))\
    .writeStream.format('parquet') \
    .trigger(once=True) \
    .option('checkpointLocation', checkpoint_path) \
    .option('path', write_path) \
    .start()

上面是我尝试过的代码,但无论时间如何,它都会加载所有可用文件。请指出我在这里做错了什么..

scala apache-spark pyspark databricks spark-structured-streaming
1个回答
0
投票

抱歉来得太晚了。

.option("maxFileAge", "1h")
仅在创建 checkpointLocation 之后才有效。

示例:

第一次运行,阅读所有内容。 第二次运行,检查 checkpointLocation 是否创建,然后按 maxFileAge 过滤并将这些文件添加到

checkpointLocation

您可以仅读取带有基本路径的 readstream 中的文件来创建单个检查点位置,然后 maxage 参数将在下一批中起作用。

spark.readStream\
.option("maxFileAge", "1h")\
.option("basePath", upload_path)\
.schema(cust_schema)\
    .csv(upload_path + "/partition1=xxxx/partition2=yyyy/") \
...
© www.soinside.com 2019 - 2024. All rights reserved.