我在 Lake 中的文件被分成 2 个分区,partition_Continent 和 Partition_Country。
我有一个
df_grouped
告诉我要使用哪个过滤器,例如,假设这给了我 2 条记录:
我根据上面的
df_grouped
创建了一个filter_condition:
filter_condition = " OR ".join(
[
(
f"(partition_Continent = '{i.Continent}'"
f" AND partition_Country = '{i.Country}')"
)
for i in df_grouped.distinct().collect()
]
)
所以当我跑步时
df_presented = spark.read.parquet(f'abfss://[email protected]/Tommy').where(filter_condition)
它会给我这两个分区中的所有记录。
但我相信直接从分区读取比使用 where 子句更快,即类似
df_presented = spark.read.parquet(f'abfss://[email protected]/Tommy/partition_Continent=Europe/partition_Country=UK').union(spark.read.parquet(f'abfss://[email protected]/Tommy/partition_Continent=Asia/partition_Country=China'))
如何根据
df_grouped
中返回的内容实现此目的?
一个简单的 for 循环应该在这里工作:
from pyspark.sql.types import StructType
df_presented = spark.createDataFrame([], StructType([]))
for row in df_grouped.distinct().collect():
file_path = f'abfss://[email protected]/Tommy/partition_Continent={row.Continent}/partition_Country={row.Country}'
try:
df_presented = df_presented.unionByName(spark.read.parquet(file_path), allowMissingColumns=True)
except:
print("some error")