我有两张桌子;销售和客户。要查询的主要表是sales,但有时我们会想要获取有关特定客户的数据并获取有关他的详细信息,因此我们必须在sales 表上加入customers。因此,对于包含用户的表,主筛选列将是 USER_ID。我想根据 user_id 对表进行分区,但 Databricks 为每个 userid 创建了一个分区文件。我想要做的是将表分成多个文件,其中有多个用户 ID 连续保存,例如 partition1 将有 ID 1-1000 的用户,partition2 将有 ID 1001-2000 的用户等等。
我对按日期分区有同样的问题,因为它每天创建一个分区文件,但我想将它存储为例如 5 天的范围。
有什么方法可以在列内的rang上存储分区?以及如何影响创建了多少个这样的分区?
到目前为止,我已经使用了 df.write.partitionBy(‘column_name’).parquet(‘location’),它造成了上述问题。
我通过生成一个值来从有问题的 id 进行分区,从而做了类似的事情。如果它们是数字 ID,您可以将它们与您希望收集的所需分区的数量进行模数运算,如果它们是足够随机的数字,甚至是连续的数字,它们应该可以很好地减少偏斜......
您可以通过指定分区方案来自定义分区的创建方式。您可以定义自己的分区函数,根据列中的值范围对数据进行分组,而不是使用基于列中不同值的默认分区。
下面是一个示例,说明如何根据
USER_ID
的范围对销售表进行分区:
from pyspark.sql.functions import *
def user_id_range_partition(user_id):
return floor(user_id / 1000)
sales.write.partitionBy(user_id_range_partition('USER_ID')).parquet('location')
在这里,
user_id_range_partition
函数采用 USER_ID
值并返回 floor
值除以 1000,这将 USER_IDs 分组为 1000 的范围。例如,USER_IDs 1-1000 将在分区 0 中,USER_IDs 1001 -2000 将在分区 1 中,依此类推。
同样,日期也可以这样做-
from pyspark.sql.functions import *
# Define the partitioning function that groups dates into ranges of 5 days
def date_range_partition(date_col):
start_date = to_date(lit('2022-01-01')) # define your own start date
days_since_start = floor((date_col - start_date).cast('int') / 5) * 5
return date_add(start_date, days_since_start)
# Partition the sales table based on date_range_partition function
sales.withColumn('sale_date_range', date_range_partition('SALE_DATE')).write.partitionBy('sale_date_range').parquet('location')
此外,您也可以使用
bucketBy
。它的工作方式是根据指定列的哈希值将数据分布到固定数量的桶中。这对于在固定数量的文件中均匀分布数据很有用,同时仍然允许基于用于分桶的列进行有效过滤。例如 - 您可以使用 bucketBy
根据 sales
列在 10 个桶中分配 USER_ID
数据 -
from pyspark.sql.functions import floor
# Define the number of buckets and the bucketing column
num_buckets = 10
bucket_column = 'USER_ID'
# Define the bucketing function that hashes USER_ID into one of 10 buckets
def bucket_user_id(user_id):
return user_id % num_buckets
# Bucket the sales table based on the bucket_user_id function and the bucket_column
sales.write.bucketBy(num_buckets, bucket_column, {'numBuckets': num_buckets}).parquet('location')