我有分区镶木地板数据:
dir/batch_date=2023-02-13/batch_hour=09
我必须通过 spark 程序读取最近 14 天的数据。目前,我正在读取数据并在数据框上应用日期过滤器作为 batch_date 负 14 天。 有什么方法可以设置一系列目录以将读取限制为仅最近 14 天的目录而不是整个数据集。
谢谢
Spark 旨在高效读取分区数据。在读取分区数据时,Spark 只读取执行所需操作所需的文件和分区,避免读取整个数据集。
要在Spark中高效读取分区数据,读取数据时必须指定分区结构。
在你的例子中,分区是“batch_date”所以要读取前 14 天的数据,你只需要这样做:
import org.apache.spark.sql.functions.{col, date_sub}
// Calculate the date 14 days ago
val cutoff_date = date_sub(current_date(), 14)
// Read data from directories for the last 2 weeks
val data = spark.read.parquet("/path/to/data")
.filter(col("batch_date") >= date_format(cutoff_date, "yyyy-MM-dd"))
你已经在做的是最佳的,因为 PartitionFilters 在 apache spark 中的概念,所以当你在分区列上应用过滤器时,这些过滤器在源数据上应用,在通过网络发送任何数据之前,减少传输的数据量。
例如,我有一些按年份分区的数据:
/path/
Year=2018/
file.parquet
Year=2019/
file.parquet
Year=2020/
file.parquet
Year=2021/
file.parquet
Year=2022/
file.parquet
Year=2023/
file.parquet
如果我应用以下代码:
spark.read.parquet("/path/").filter(col("Year") >= "2020").explain()
我会得到以下的体检计划:
== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [Variable_name#0,Value#1,Units#2,Year#3] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/user/out..., PartitionFilters: [isnotnull(Year#3), (Year#3 >= 2020)], PushedFilters: [], ReadSchema: struct<Variable_name:string,Value:string,Units:string>
如果您搜索 PartitionFilters,您会发现:
PartitionFilters: [isnotnull(Year#3), (Year#3 >= 2020)]
这意味着应用了分区过滤器,并且只会加载所需的分区,但是如果您没有看到 PartitionFilters,则意味着出现问题并且将加载整个数据
如果由于某种原因 PartitionFilters 不起作用,您可以随时使用 hadoop 来过滤您想要使用 spark 加载的文件夹
val hdfs = new Path(path).getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
val filesToRead = hdfs.listStatus(new Path(path)).toList.filter(_.getPath.getName.split("=")(1) >= min_date)
然后使用 spark 读取 filesToRead。