我想对每组大小的行进行分区/分组 <= limit
例如,如果我有:
+--------+----------+
| id| size|
+--------+----------+
| 1| 3|
| 2| 6|
| 3| 8|
| 4| 5|
| 5| 7|
| 6| 7|
+--------+----------+
我想按每个尺寸对行进行分组<=10, result would be:
+--------+----------+----------+
| id| size| group|
+--------+----------+----------+
| 1| 3| 0|
| 2| 6| 0|
| 3| 8| 1|
| 4| 5| 2|
| 5| 7| 3|
| 6| 7| 4|
+--------+----------+----------+
另一个例子,按每个尺寸 <=13,
+--------+----------+----------+
| id| size| group|
+--------+----------+----------+
| 1| 3| 0|
| 2| 6| 0|
| 3| 8| 1|
| 4| 5| 1|
| 5| 7| 2|
| 6| 7| 3|
+--------+----------+----------+
甚至不太确定从哪里开始,已经研究过窗口函数、归约函数、用户定义聚合函数或添加加法列(例如累加和等)..
最初的任务是将请求有效负载分组,以便在大小限制下将它们分组为单个请求。
这是一个例子。由于您的应用程序不需要精确的正确性,这意味着我们可以具有近似的正确性。
首先我们将行分组为合适的大小。在这些组中,我们使用 pandas_udf 分组图来查找子组,这些子组为您提供带有 < payload_limit.
的最佳行数这是一个可能的例子。
import math
from pyspark.sql.functions import avg, floor, rand, pandas_udf, PandasUDFType
from pyspark.sql.functions import col, sum, row_number, monotonically_increasing_id, count
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("Example") \
.getOrCreate()
data = [
(1, 3),
(2, 6),
(3, 8),
(4, 5),
(5, 7),
(6, 7),
(11, 3),
(12, 6),
(13, 8),
(14, 5),
(15, 7),
(16, 7),
(21, 3),
(22, 6),
(23, 8),
(24, 5),
(25, 7),
(26, 7)
]
df = spark.createDataFrame(data, ["id", "size"])
# Find the average size
avg_size = df.select(avg("size")).collect()[0][0]
payload_limit = 20
rows_per_group = math.floor(payload_limit / avg_size)
print(f"{avg_size=}")
print(f"{rows_per_group=}")
print(f"{df.count()=}")
nums_of_group = math.ceil(df.count() / rows_per_group)
print(f"{nums_of_group=}")
df = df.withColumn("random_group_id", floor(rand() * nums_of_group))
distinct_group_ids = df.select(col("random_group_id")).distinct()
distinct_group_ids.show(n=100, truncate=False)
print(f"{distinct_group_ids.count()}")
grouped_counts = df.groupby(col("random_group_id")).agg(count("*"))
grouped_counts.show(n=100, truncate=False)
df.show(n=100, truncate=False)
result_schema = StructType([
StructField("id", IntegerType()),
StructField("size", IntegerType()),
StructField("random_group_id", IntegerType()),
StructField("sub_group", IntegerType()),
])
@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def group_by_limit(pdf):
limit = payload_limit
group_col = "sub_group"
print("before")
print(pdf)
# Calculate the cumulative sum of sizes within each random group
pdf["cum_size"] = pdf.groupby("random_group_id")["size"].cumsum()
# Assign group numbers based on the cumulative sum and limit
pdf[group_col] = (pdf["cum_size"]) // limit
# Drop the cumulative sum column
pdf = pdf.drop("cum_size", axis=1)
print("after")
print(pdf)
return pdf
# Apply the pandas UDF to the DataFrame
grouped_df = df.groupby("random_group_id").apply(group_by_limit)
grouped_df.show()
## Verify correctness of our algorithm.
result_grouped = grouped_df.groupBy("random_group_id", "sub_group").agg(sum("size"))
result_grouped.orderBy("random_group_id", "sub_group").show(n=100, truncate=False)
输出:
avg_size=6.0
rows_per_group=3
df.count()=18
nums_of_group=6
+---------------+
|random_group_id|
+---------------+
|5 |
|3 |
|1 |
|2 |
|4 |
+---------------+
5
+---------------+--------+
|random_group_id|count(1)|
+---------------+--------+
|5 |3 |
|3 |4 |
|1 |5 |
|2 |2 |
|4 |4 |
+---------------+--------+
+---+----+---------------+
|id |size|random_group_id|
+---+----+---------------+
|1 |3 |5 |
|2 |6 |3 |
|3 |8 |1 |
|4 |5 |5 |
|5 |7 |1 |
|6 |7 |1 |
|11 |3 |3 |
|12 |6 |3 |
|13 |8 |5 |
|14 |5 |2 |
|15 |7 |4 |
|16 |7 |1 |
|21 |3 |4 |
|22 |6 |2 |
|23 |8 |3 |
|24 |5 |1 |
|25 |7 |4 |
|26 |7 |4 |
+---+----+---------------+
+---+----+---------------+---------+
| id|size|random_group_id|sub_group|
+---+----+---------------+---------+
| 3| 8| 1| 0|
| 5| 7| 1| 0|
| 6| 7| 1| 1|
| 16| 7| 1| 1|
| 24| 5| 1| 1|
| 14| 5| 2| 0|
| 22| 6| 2| 0|
| 2| 6| 3| 0|
| 11| 3| 3| 0|
| 12| 6| 3| 0|
| 23| 8| 3| 1|
| 15| 7| 4| 0|
| 21| 3| 4| 0|
| 25| 7| 4| 0|
| 26| 7| 4| 1|
| 1| 3| 5| 0|
| 4| 5| 5| 0|
| 13| 8| 5| 0|
+---+----+---------------+---------+
+---------------+---------+---------+
|random_group_id|sub_group|sum(size)|
+---------------+---------+---------+
|1 |0 |15 |
|1 |1 |19 |
|2 |0 |11 |
|3 |0 |15 |
|3 |1 |8 |
|4 |0 |17 |
|4 |1 |7 |
|5 |0 |16 |
+---------------+---------+---------+