如何读取apache Spark中最新的分区

问题描述 投票:0回答:3

我使用了包含查询的数据框

df : Dataframe =spark.sql(s"show Partitions $yourtablename")

现在分区数量每天都在变化,因为它每天都在运行。

主要关心的是我需要获取最新的分区。

假设我获得特定日期的随机表的分区 喜欢

year=2019/month=1/day=1
year=2019/month=1/day=10
year=2019/month=1/day=2
year=2019/month=1/day=21
year=2019/month=1/day=22
year=2019/month=1/day=23
year=2019/month=1/day=24
year=2019/month=1/day=25
year=2019/month=1/day=26
year=2019/month=2/day=27
year=2019/month=2/day=3

现在您可以看到它对分区进行排序的功能,以便在

day=1
之后出现
day=10
。这会产生一个问题,因为我需要获取最新的分区。

我已经成功使用

获取分区
val df =dff.orderby(col("partition").desc.limit(1)

但这给了我 tail -1 分区,而不是最新的分区。

如何克服hives排列分区的限制,从表中获取最新的分区?

所以假设在上面的例子中我需要接听

 year=2019/month=2/day=27

而不是

year=2019/month=2/day=3

这是表中的最后一个分区。

dataframe apache-spark apache-spark-sql rdd natural-sort
3个回答
2
投票

您可以从

SHOW PARTITIONS

获取最大分区
spark.sql("SHOW PARTITIONS my_database.my_table").select(max('partition)).show(false)

1
投票

我不会依赖位置依赖,但如果你这样做,我至少会有年=2019/月=2/日=03。

我会通过 SQL 语句依赖分区修剪和 SQL。我不确定你是否使用 ORC、PARQUET 等,但分区修剪应该是一个常见的方法。

例如

 val df = sparkSession.sql(""" select max(partition_col)
                                 from randomtable 
                           """)

 val maxVal = df.first().getString(0) // this as sql result is a DF

另请参阅 https://mapr.com/blog/tips-and-best-practices-to-take-advantage-of-spark-2-x/


0
投票

这段代码肯定可以以某种方式写得更好,但它可以工作,并且通过 pyspark 加载整个数据并通过

concat_ws('-', year, month, day')
找到最大值,速度肯定更快。

首先让我们添加将潜在

dbfs:/...
路径转换为本地路径
/dbfs/...
的函数。这是必需的,因为我们将在后面的步骤中使用
os.walk
函数。如果提供的路径不是dbfs路径,可以忽略。


from typings import Union
from pathlib import Path

def get_local_path(path: Union[str, Path]) -> str:
    """
    Transforms a potential dbfs path to a
    path accessible by standard file system operations

    :param path: Path to transform to local path
    :return: The local path
    """
    return str(path).replace("dbfs:", "/dbfs")

现在让我们找到最大值,如果您有如下所示的文件结构:

year=2024-9999/
    month=01-12/
         day=01-31/

要找到所有这些路径之间的最大值,请迭代目录并将值动态添加到字典中。


def get_max_year_month_day_folder(path: str) -> str:
    
    max_dates = {
        "year": "0000",
        "month": "00",
        "day": "00",
    }

    date_keys = list(max_dates.keys())
    for root, dirs, files in os.walk(get_local_path(path)):
        for directory_name in dirs:
            
            date_key = ""
            for date_key in date_keys:
                if date_key in directory_name:
                    break
            
            if date_key in directory_name:
                cur = max_dates.get(date_key)
                new = directory_name.split("=")[1]
                if int(new) > int(cur):
                    max_dates[date_key] = new

    return f"{max_dates['year']}-{max_dates['month']}-{max_dates['day']}"

path = "dbfs:/mnt/..."
max_date = get_max_year_month_day_folder(path)

© www.soinside.com 2019 - 2024. All rights reserved.