有没有一种方法可以按数据进行分区/分组,其中每组的列值总和低于限制?

问题描述 投票:0回答:1

我想对每组大小的行进行分区/分组 <= limit

例如,如果我有:

+--------+----------+
|      id|      size|
+--------+----------+
|       1|         3|
|       2|         6|
|       3|         8|
|       4|         5|
|       5|         7|
|       6|         7|
+--------+----------+

我想按每个尺寸对行进行分组<=10, result would be:

+--------+----------+----------+
|      id|      size|     group|
+--------+----------+----------+
|       1|         3|         0|
|       2|         6|         0|
|       3|         8|         1|
|       4|         5|         2|
|       5|         7|         3|
|       6|         7|         4|
+--------+----------+----------+

另一个例子,按每个尺寸 <=13,

+--------+----------+----------+
|      id|      size|     group|
+--------+----------+----------+
|       1|         3|         0|
|       2|         6|         0|
|       3|         8|         1|
|       4|         5|         1|
|       5|         7|         2|
|       6|         7|         3|
+--------+----------+----------+

甚至不太确定从哪里开始,已经研究过窗口函数、归约函数、用户定义聚合函数或添加加法列(例如累加和等)..

最初的任务是将请求有效负载分组,以便在大小限制下将它们分组为单个请求。

apache-spark pyspark databricks databricks-sql scala-spark
1个回答
0
投票

这是一个例子。由于您的应用程序不需要精确的正确性,这意味着我们可以具有近似的正确性。

首先我们将行分组为合适的大小。在这些组中,我们使用 pandas_udf 分组图来查找子组,这些子组为您提供带有 < payload_limit.

的最佳行数

这是一个可能的例子。

import math
from pyspark.sql.functions import avg, floor, rand, pandas_udf, PandasUDFType
from pyspark.sql.functions import col, sum, row_number, monotonically_increasing_id, count
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("Example") \
    .getOrCreate()

data = [
    (1, 3),
    (2, 6),
    (3, 8),
    (4, 5),
    (5, 7),
    (6, 7),
    (11, 3),
    (12, 6),
    (13, 8),
    (14, 5),
    (15, 7),
    (16, 7),
    (21, 3),
    (22, 6),
    (23, 8),
    (24, 5),
    (25, 7),
    (26, 7)

]
df = spark.createDataFrame(data, ["id", "size"])

# Find the average size
avg_size = df.select(avg("size")).collect()[0][0]

payload_limit = 20
rows_per_group = math.floor(payload_limit / avg_size)
print(f"{avg_size=}")
print(f"{rows_per_group=}")
print(f"{df.count()=}")
nums_of_group = math.ceil(df.count() / rows_per_group)
print(f"{nums_of_group=}")

df = df.withColumn("random_group_id", floor(rand() * nums_of_group))
distinct_group_ids = df.select(col("random_group_id")).distinct()
distinct_group_ids.show(n=100, truncate=False)
print(f"{distinct_group_ids.count()}")

grouped_counts = df.groupby(col("random_group_id")).agg(count("*"))
grouped_counts.show(n=100, truncate=False)

df.show(n=100, truncate=False)

result_schema = StructType([
    StructField("id", IntegerType()),
    StructField("size", IntegerType()),
    StructField("random_group_id", IntegerType()),
    StructField("sub_group", IntegerType()),
])


@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def group_by_limit(pdf):
    limit = payload_limit
    group_col = "sub_group"

    print("before")
    print(pdf)

    # Calculate the cumulative sum of sizes within each random group
    pdf["cum_size"] = pdf.groupby("random_group_id")["size"].cumsum()

    # Assign group numbers based on the cumulative sum and limit
    pdf[group_col] = (pdf["cum_size"]) // limit


    # Drop the cumulative sum column
    pdf = pdf.drop("cum_size", axis=1)

    print("after")
    print(pdf)

    return pdf


# Apply the pandas UDF to the DataFrame
grouped_df = df.groupby("random_group_id").apply(group_by_limit)


grouped_df.show()

## Verify correctness of our algorithm.
result_grouped = grouped_df.groupBy("random_group_id", "sub_group").agg(sum("size"))
result_grouped.orderBy("random_group_id", "sub_group").show(n=100, truncate=False)

输出:

avg_size=6.0
rows_per_group=3
df.count()=18
nums_of_group=6
+---------------+
|random_group_id|
+---------------+
|5              |
|3              |
|1              |
|2              |
|4              |
+---------------+

5
+---------------+--------+
|random_group_id|count(1)|
+---------------+--------+
|5              |3       |
|3              |4       |
|1              |5       |
|2              |2       |
|4              |4       |
+---------------+--------+

+---+----+---------------+
|id |size|random_group_id|
+---+----+---------------+
|1  |3   |5              |
|2  |6   |3              |
|3  |8   |1              |
|4  |5   |5              |
|5  |7   |1              |
|6  |7   |1              |
|11 |3   |3              |
|12 |6   |3              |
|13 |8   |5              |
|14 |5   |2              |
|15 |7   |4              |
|16 |7   |1              |
|21 |3   |4              |
|22 |6   |2              |
|23 |8   |3              |
|24 |5   |1              |
|25 |7   |4              |
|26 |7   |4              |
+---+----+---------------+

+---+----+---------------+---------+
| id|size|random_group_id|sub_group|
+---+----+---------------+---------+
|  3|   8|              1|        0|
|  5|   7|              1|        0|
|  6|   7|              1|        1|
| 16|   7|              1|        1|
| 24|   5|              1|        1|
| 14|   5|              2|        0|
| 22|   6|              2|        0|
|  2|   6|              3|        0|
| 11|   3|              3|        0|
| 12|   6|              3|        0|
| 23|   8|              3|        1|
| 15|   7|              4|        0|
| 21|   3|              4|        0|
| 25|   7|              4|        0|
| 26|   7|              4|        1|
|  1|   3|              5|        0|
|  4|   5|              5|        0|
| 13|   8|              5|        0|
+---+----+---------------+---------+

+---------------+---------+---------+
|random_group_id|sub_group|sum(size)|
+---------------+---------+---------+
|1              |0        |15       |
|1              |1        |19       |
|2              |0        |11       |
|3              |0        |15       |
|3              |1        |8        |
|4              |0        |17       |
|4              |1        |7        |
|5              |0        |16       |
+---------------+---------+---------+
© www.soinside.com 2019 - 2024. All rights reserved.