如何根据日期范围过滤镶木地板分区?

问题描述 投票:0回答:2

我有分区镶木地板数据:

dir/batch_date=2023-02-13/batch_hour=09

我必须通过 spark 程序读取最近 14 天的数据。目前,我正在读取数据并在数据框上应用日期过滤器作为 batch_date 负 14 天。 有什么方法可以设置一系列目录以将读取限制为仅最近 14 天的目录而不是整个数据集。

谢谢

scala apache-spark filter apache-spark-sql partition
2个回答
1
投票

Spark 旨在高效读取分区数据。在读取分区数据时,Spark 只读取执行所需操作所需的文件和分区,避免读取整个数据集。

要在Spark中高效读取分区数据,读取数据时必须指定分区结构。

在你的例子中,分区是“batch_date”所以要读取前 14 天的数据,你只需要这样做:

import org.apache.spark.sql.functions.{col, date_sub}

// Calculate the date 14 days ago
val cutoff_date = date_sub(current_date(), 14)

// Read data from directories for the last 2 weeks
val data = spark.read.parquet("/path/to/data")
  .filter(col("batch_date") >= date_format(cutoff_date, "yyyy-MM-dd"))

0
投票

你已经在做的是最佳的,因为 PartitionFilters 在 apache spark 中的概念,所以当你在分区列上应用过滤器时,这些过滤器在源数据上应用,在通过网络发送任何数据之前,减少传输的数据量。

例如,我有一些按年份分区的数据:

/path/
   Year=2018/
       file.parquet
   Year=2019/
       file.parquet
   Year=2020/
       file.parquet
   Year=2021/
       file.parquet
   Year=2022/
       file.parquet
   Year=2023/
       file.parquet

如果我应用以下代码:

spark.read.parquet("/path/").filter(col("Year") >= "2020").explain()

我会得到以下的体检计划:

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [Variable_name#0,Value#1,Units#2,Year#3] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/user/out..., PartitionFilters: [isnotnull(Year#3), (Year#3 >= 2020)], PushedFilters: [], ReadSchema: struct<Variable_name:string,Value:string,Units:string>

如果您搜索 PartitionFilters,您会发现:

PartitionFilters: [isnotnull(Year#3), (Year#3 >= 2020)]

这意味着应用了分区过滤器,并且只会加载所需的分区,但是如果您没有看到 PartitionFilters,则意味着出现问题并且将加载整个数据

如果由于某种原因 PartitionFilters 不起作用,您可以随时使用 hadoop 来过滤您想要使用 spark 加载的文件夹

val hdfs = new Path(path).getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
val filesToRead = hdfs.listStatus(new Path(path)).toList.filter(_.getPath.getName.split("=")(1) >= min_date)

然后使用 spark 读取 filesToRead。

© www.soinside.com 2019 - 2024. All rights reserved.