Pyspark 如何避免顶部和嵌套结构中的组爆炸(代码优化)

问题描述 投票:0回答:1

问题

我想根据请求数据计算一些统计数据,这些数据按顶层中的值和嵌套层中的值分组。 explode-join 和 3x groupby 的主要问题是大数据(100GB)的代码太慢

样本数据:

import pyspark.sql.types as T

rows = [
    {"id": 1, "typeId": 1, "items":[
        {"itemType": 1,"flag": False,"event": None},
        {"itemType": 3,"flag": True,"event":[{"info1": ""},{"info1": ""}]},
        {"itemType": 3,"flag": True,"event":[{"info1": ""},{"info1": ""}]},
    ]},
    {"id": 2, "typeId": 2, "items":None},
    {"id": 3, "typeId": 1, "items":[
        {"itemType": 1,"flag": False,"event": None},
        {"itemType": 6,"flag": False,"event":[{"info1": ""}]},
        {"itemType": 6,"flag": False,"event":None},
    ]},
    {"id": 4, "typeId": 2, "items":[
        {"itemType": 1,"flag": True,"event":[{"info1": ""}]},
    ]},
    {"id": 5, "typeId": 3, "items":None},
]

schema = T.StructType([
   T.StructField("id", T.IntegerType(), False),
   T.StructField("typeId", T.IntegerType()),
   T.StructField("items", T.ArrayType(T.StructType([
           T.StructField("itemType", T.IntegerType()),
           T.StructField("flag", T.BooleanType()),
           T.StructField("event", T.ArrayType(T.StructType([
                   T.StructField("info1", T.StringType()),
           ]))),
       ])), True),
])

df = spark.createDataFrame(rows, schema)
df.printSchema()

其结构:

root
 |-- id: integer (nullable = false)
 |-- typeId: integer (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- itemType: integer (nullable = true)
 |    |    |-- flag: boolean (nullable = true)
 |    |    |-- event: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- info1: string (nullable = true)

我想对每个

typeid
和按
items.itemtype
:

执行这些计算
  • 总行数(请求)
  • 如果包含某些项目,则为总行数(请求)
  • 行总数(请求)如果包含一些带有 (items.flag==True) 的项目
  • 商品总数
  • 标记项目总数 (items.flag==True)
  • 项目上的事件总数 (sum(size("items.event")))

代码

获取每个

typeId
的请求总数很简单,在实际分析中
layer1_groups
包含更多类别列:

import pyspark.sql.functions as F

layer1_groups = ["typeId"]

# get count for groups in top layer
totaldf = df.groupby(layer1_groups).agg(F.count(F.lit(1)).alias("requests"))

为了将来的计算(例如,嵌套组上计算出的数字的比率),请将这些数字连接到原始数据帧:

df = df.join(totaldf, layer1_groups)

分解

items
以允许按嵌套
items.itemType

进行分组
exploded_df = df.withColumn("I", F.explode_outer("items")).select("*","I.*").drop("items","I")
# add another info of item (number of events)
exploded_df = exploded_df.withColumn("eSize", F.greatest(F.size("event"), F.lit(0)))

对每个请求进行分组统计(groupby

"id"
)来获取,因为在将来的计算中,我只想在有标记的项目等时对请求进行计数:

layer2_groups = ["itemType"]

each_requests = exploded_df.groupby(["id", *layer1_groups, *layer2_groups]).agg(
    F.first("requests").alias("requests"),
    F.count(F.lit(1)).alias("ItemCount"),
    F.sum(F.col("flag").cast(T.ByteType())).alias("fItemCount"),
    F.sum("eSize").alias("eCount"),
)

完成组不包含

"id"
组:

# results without layer1 "id" to obtain resulsts
requests_results = each_requests.groupby([*layer1_groups, *layer2_groups]).agg(
    F.first("requests").alias("requests"),
    F.count_if(F.col("ItemCount")>0).alias("requestsWithItems"),
    F.count_if(F.col("fItemCount")>0).alias("requestsWith_fItems"),
    F.sum("ItemCount").alias("ItemCount"),
    F.sum("fItemCount").alias("fItemCount"),
    F.sum("eCount").alias("eCount"),
).show()

结果是:

+------+--------+--------+-----------------+-------------------+---------+----------+------+
|typeId|itemType|requests|requestsWithItems|requestsWith_fItems|ItemCount|fItemCount|eCount|
+------+--------+--------+-----------------+-------------------+---------+----------+------+
|     1|       1|       2|                2|                  0|        2|         0|     0|
|     1|       3|       2|                1|                  1|        2|         2|     4|
|     1|       6|       2|                1|                  0|        2|         0|     1|
|     2|       1|       2|                1|                  1|        1|         1|     1|
|     2|    NULL|       2|                1|                  0|        1|      NULL|     0|
|     3|    NULL|       1|                1|                  0|        1|      NULL|     0|
+------+--------+--------+-----------------+-------------------+---------+----------+------+

完整代码

要点:https://gist.github.com/vanheck/bfcadf7396d765ddd2fff5f544fd7cf2

问题

有什么方法可以让这段代码更快吗?或者我可以避免使用

explode
函数来获取这些统计数据吗?

apache-spark pyspark nested explode
1个回答
0
投票

您无需爆炸即可获得所需的统计数据。我在本地尝试了下面的方法并且成功了。我已经粘贴了我的结果 - 请根据您的要求进行调整。

from pyspark.sql import functions as F

# Function to count items based on a condition
def count_items_condition(col, condition):
    return F.size(F.expr(f"filter({col}, item -> {condition})"))

# Function to sum the size of "event" arrays in "items"
def sum_events_size(col):
    return F.expr(f"aggregate({col}, 0, (acc, item) -> acc + size(item.event))")

# Add calculations as new columns
df = df.withColumn("total_requests", F.lit(1)) \
       .withColumn("total_requests_with_item", F.when(F.size("items") > 0, 1).otherwise(0)) \
       .withColumn("total_requests_with_item_flag_true", count_items_condition("items", "item.flag")) \
       .withColumn("total_items", F.size("items")) \
       .withColumn("total_flagged_items", count_items_condition("items", "item.flag")) \
       .withColumn("total_events_on_items", sum_events_size("items"))

# Group by typeId and sum the calculations
result = df.groupBy("typeId") \
           .agg(
               F.sum("total_requests").alias("total_requests"),
               F.sum("total_requests_with_item").alias("total_requests_with_item"),
               F.sum("total_requests_with_item_flag_true").alias("total_requests_with_item_flag_true"),
               F.sum("total_items").alias("total_items"),
               F.sum("total_flagged_items").alias("total_flagged_items"),
               F.sum("total_events_on_items").alias("total_events_on_items")
           )

result.show()

+------+--------------+------------------------+----------------------------------+-----------+-------------------+---------------------+
|typeId|total_requests|total_requests_with_item|total_requests_with_item_flag_true|total_items|total_flagged_items|total_events_on_items|
+------+--------------+------------------------+----------------------------------+-----------+-------------------+---------------------+
|     1|             2|                       2|                                 2|          6|                  2|                    2|
|     2|             2|                       1|                                 0|          0|                  0|                    1|
|     3|             1|                       0|                                -1|         -1|                 -1|                 NULL|
+------+--------------+------------------------+----------------------------------+-----------+-------------------+---------------------+
© www.soinside.com 2019 - 2024. All rights reserved.