我正在尝试读取目录中可用的最新文件(例如过去一小时内的新文件)并加载该数据。我正在尝试使用 pyspark 结构化流。我已经尝试了 Spark Streaming 的 maxFileAge 选项,但它仍然加载目录中的所有文件,无论选项中指定的时间如何。
spark.readStream\
.option("maxFileAge", "1h")\
.schema(cust_schema)\
.csv(upload_path) \
.withColumn("closing_date", get_date_udf_func(input_file_name()))\
.writeStream.format('parquet') \
.trigger(once=True) \
.option('checkpointLocation', checkpoint_path) \
.option('path', write_path) \
.start()
上面是我尝试过的代码,但无论时间如何,它都会加载所有可用文件。请指出我在这里做错了什么..
抱歉来得太晚了。
.option("maxFileAge", "1h")
仅在创建 checkpointLocation 之后才有效。
示例:
第一次运行,阅读所有内容。 第二次运行,检查 checkpointLocation 是否创建,然后按 maxFileAge 过滤并将这些文件添加到
checkpointLocation
。
您可以仅读取带有基本路径的 readstream 中的文件来创建单个检查点位置,然后 maxage 参数将在下一批中起作用。
spark.readStream\
.option("maxFileAge", "1h")\
.option("basePath", upload_path)\
.schema(cust_schema)\
.csv(upload_path + "/partition1=xxxx/partition2=yyyy/") \
...