我使用了包含查询的数据框
df : Dataframe =spark.sql(s"show Partitions $yourtablename")
现在分区数量每天都在变化,因为它每天都在运行。
主要关心的是我需要获取最新的分区。
假设我获得特定日期的随机表的分区 喜欢
year=2019/month=1/day=1
year=2019/month=1/day=10
year=2019/month=1/day=2
year=2019/month=1/day=21
year=2019/month=1/day=22
year=2019/month=1/day=23
year=2019/month=1/day=24
year=2019/month=1/day=25
year=2019/month=1/day=26
year=2019/month=2/day=27
year=2019/month=2/day=3
现在您可以看到它对分区进行排序的功能,以便在
day=1
之后出现 day=10
。这会产生一个问题,因为我需要获取最新的分区。
我已经成功使用
获取分区val df =dff.orderby(col("partition").desc.limit(1)
但这给了我 tail -1 分区,而不是最新的分区。
如何克服hives排列分区的限制,从表中获取最新的分区?
所以假设在上面的例子中我需要接听
year=2019/month=2/day=27
而不是
year=2019/month=2/day=3
这是表中的最后一个分区。
您可以从
SHOW PARTITIONS
获取最大分区
spark.sql("SHOW PARTITIONS my_database.my_table").select(max('partition)).show(false)
我不会依赖位置依赖,但如果你这样做,我至少会有年=2019/月=2/日=03。
我会通过 SQL 语句依赖分区修剪和 SQL。我不确定你是否使用 ORC、PARQUET 等,但分区修剪应该是一个常见的方法。
例如
val df = sparkSession.sql(""" select max(partition_col)
from randomtable
""")
val maxVal = df.first().getString(0) // this as sql result is a DF
另请参阅 https://mapr.com/blog/tips-and-best-practices-to-take-advantage-of-spark-2-x/
这段代码肯定可以以某种方式写得更好,但它可以工作,并且通过 pyspark 加载整个数据并通过
concat_ws('-', year, month, day')
找到最大值,速度肯定更快。
首先让我们添加将潜在
dbfs:/...
路径转换为本地路径 /dbfs/...
的函数。这是必需的,因为我们将在后面的步骤中使用 os.walk
函数。如果提供的路径不是dbfs路径,可以忽略。
from typings import Union
from pathlib import Path
def get_local_path(path: Union[str, Path]) -> str:
"""
Transforms a potential dbfs path to a
path accessible by standard file system operations
:param path: Path to transform to local path
:return: The local path
"""
return str(path).replace("dbfs:", "/dbfs")
现在让我们找到最大值,如果您有如下所示的文件结构:
year=2024-9999/
month=01-12/
day=01-31/
要找到所有这些路径之间的最大值,请迭代目录并将值动态添加到字典中。
def get_max_year_month_day_folder(path: str) -> str:
max_dates = {
"year": "0000",
"month": "00",
"day": "00",
}
date_keys = list(max_dates.keys())
for root, dirs, files in os.walk(get_local_path(path)):
for directory_name in dirs:
date_key = ""
for date_key in date_keys:
if date_key in directory_name:
break
if date_key in directory_name:
cur = max_dates.get(date_key)
new = directory_name.split("=")[1]
if int(new) > int(cur):
max_dates[date_key] = new
return f"{max_dates['year']}-{max_dates['month']}-{max_dates['day']}"
path = "dbfs:/mnt/..."
max_date = get_max_year_month_day_folder(path)