动态联合 Pyspark 数据帧

问题描述 投票:0回答:1

我在 Lake 中的文件被分成 2 个分区,partition_Continent 和 Partition_Country。

我有一个

df_grouped
告诉我要使用哪个过滤器,例如,假设这给了我 2 条记录:

enter image description here

我根据上面的

df_grouped
创建了一个filter_condition:

filter_condition = " OR ".join(
    [
        (
            f"(partition_Continent = '{i.Continent}'"
            f" AND partition_Country = '{i.Country}')"
        )
        for i in df_grouped.distinct().collect()
    ]
)

enter image description here

所以当我跑步时

df_presented = spark.read.parquet(f'abfss://[email protected]/Tommy').where(filter_condition)

它会给我这两个分区中的所有记录。

但我相信直接从分区读取比使用 where 子句更快,即类似

df_presented = spark.read.parquet(f'abfss://[email protected]/Tommy/partition_Continent=Europe/partition_Country=UK').union(spark.read.parquet(f'abfss://[email protected]/Tommy/partition_Continent=Asia/partition_Country=China'))

如何根据

df_grouped
中返回的内容实现此目的?

pyspark databricks azure-databricks azure-data-lake azure-data-lake-gen2
1个回答
0
投票

一个简单的 for 循环应该在这里工作:

from pyspark.sql.types import StructType

df_presented = spark.createDataFrame([], StructType([]))
for row in df_grouped.distinct().collect():
    file_path = f'abfss://[email protected]/Tommy/partition_Continent={row.Continent}/partition_Country={row.Country}'
    try:
        df_presented = df_presented.unionByName(spark.read.parquet(file_path), allowMissingColumns=True)
    except:
        print("some error")
© www.soinside.com 2019 - 2024. All rights reserved.