我想根据请求数据计算一些统计数据,这些数据按顶层中的值和嵌套层中的值分组。 explode-join 和 3x groupby 的主要问题是大数据(100GB)的代码太慢。
样本数据:
import pyspark.sql.types as T
rows = [
{"id": 1, "typeId": 1, "items":[
{"itemType": 1,"flag": False,"event": None},
{"itemType": 3,"flag": True,"event":[{"info1": ""},{"info1": ""}]},
{"itemType": 3,"flag": True,"event":[{"info1": ""},{"info1": ""}]},
]},
{"id": 2, "typeId": 2, "items":None},
{"id": 3, "typeId": 1, "items":[
{"itemType": 1,"flag": False,"event": None},
{"itemType": 6,"flag": False,"event":[{"info1": ""}]},
{"itemType": 6,"flag": False,"event":None},
]},
{"id": 4, "typeId": 2, "items":[
{"itemType": 1,"flag": True,"event":[{"info1": ""}]},
]},
{"id": 5, "typeId": 3, "items":None},
]
schema = T.StructType([
T.StructField("id", T.IntegerType(), False),
T.StructField("typeId", T.IntegerType()),
T.StructField("items", T.ArrayType(T.StructType([
T.StructField("itemType", T.IntegerType()),
T.StructField("flag", T.BooleanType()),
T.StructField("event", T.ArrayType(T.StructType([
T.StructField("info1", T.StringType()),
]))),
])), True),
])
df = spark.createDataFrame(rows, schema)
df.printSchema()
其结构:
root
|-- id: integer (nullable = false)
|-- typeId: integer (nullable = true)
|-- items: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- itemType: integer (nullable = true)
| | |-- flag: boolean (nullable = true)
| | |-- event: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- info1: string (nullable = true)
我想对每个
typeid
和按 items.itemtype
: 执行这些计算
获取每个
typeId
的请求总数很简单,在实际分析中layer1_groups
包含更多类别列:
import pyspark.sql.functions as F
layer1_groups = ["typeId"]
# get count for groups in top layer
totaldf = df.groupby(layer1_groups).agg(F.count(F.lit(1)).alias("requests"))
为了将来的计算(例如,嵌套组上计算出的数字的比率),请将这些数字连接到原始数据帧:
df = df.join(totaldf, layer1_groups)
分解
items
以允许按嵌套 items.itemType
进行分组
exploded_df = df.withColumn("I", F.explode_outer("items")).select("*","I.*").drop("items","I")
# add another info of item (number of events)
exploded_df = exploded_df.withColumn("eSize", F.greatest(F.size("event"), F.lit(0)))
对每个请求进行分组统计(groupby
"id"
)来获取,因为在将来的计算中,我只想在有标记的项目等时对请求进行计数:
layer2_groups = ["itemType"]
each_requests = exploded_df.groupby(["id", *layer1_groups, *layer2_groups]).agg(
F.first("requests").alias("requests"),
F.count(F.lit(1)).alias("ItemCount"),
F.sum(F.col("flag").cast(T.ByteType())).alias("fItemCount"),
F.sum("eSize").alias("eCount"),
)
完成组不包含
"id"
组:
# results without layer1 "id" to obtain resulsts
requests_results = each_requests.groupby([*layer1_groups, *layer2_groups]).agg(
F.first("requests").alias("requests"),
F.count_if(F.col("ItemCount")>0).alias("requestsWithItems"),
F.count_if(F.col("fItemCount")>0).alias("requestsWith_fItems"),
F.sum("ItemCount").alias("ItemCount"),
F.sum("fItemCount").alias("fItemCount"),
F.sum("eCount").alias("eCount"),
).show()
结果是:
+------+--------+--------+-----------------+-------------------+---------+----------+------+
|typeId|itemType|requests|requestsWithItems|requestsWith_fItems|ItemCount|fItemCount|eCount|
+------+--------+--------+-----------------+-------------------+---------+----------+------+
| 1| 1| 2| 2| 0| 2| 0| 0|
| 1| 3| 2| 1| 1| 2| 2| 4|
| 1| 6| 2| 1| 0| 2| 0| 1|
| 2| 1| 2| 1| 1| 1| 1| 1|
| 2| NULL| 2| 1| 0| 1| NULL| 0|
| 3| NULL| 1| 1| 0| 1| NULL| 0|
+------+--------+--------+-----------------+-------------------+---------+----------+------+
要点:https://gist.github.com/vanheck/bfcadf7396d765ddd2fff5f544fd7cf2
有什么方法可以让这段代码更快吗?或者我可以避免使用
explode
函数来获取这些统计数据吗?
您无需爆炸即可获得所需的统计数据。我在本地尝试了下面的方法并且成功了。我已经粘贴了我的结果 - 请根据您的要求进行调整。
from pyspark.sql import functions as F
# Function to count items based on a condition
def count_items_condition(col, condition):
return F.size(F.expr(f"filter({col}, item -> {condition})"))
# Function to sum the size of "event" arrays in "items"
def sum_events_size(col):
return F.expr(f"aggregate({col}, 0, (acc, item) -> acc + size(item.event))")
# Add calculations as new columns
df = df.withColumn("total_requests", F.lit(1)) \
.withColumn("total_requests_with_item", F.when(F.size("items") > 0, 1).otherwise(0)) \
.withColumn("total_requests_with_item_flag_true", count_items_condition("items", "item.flag")) \
.withColumn("total_items", F.size("items")) \
.withColumn("total_flagged_items", count_items_condition("items", "item.flag")) \
.withColumn("total_events_on_items", sum_events_size("items"))
# Group by typeId and sum the calculations
result = df.groupBy("typeId") \
.agg(
F.sum("total_requests").alias("total_requests"),
F.sum("total_requests_with_item").alias("total_requests_with_item"),
F.sum("total_requests_with_item_flag_true").alias("total_requests_with_item_flag_true"),
F.sum("total_items").alias("total_items"),
F.sum("total_flagged_items").alias("total_flagged_items"),
F.sum("total_events_on_items").alias("total_events_on_items")
)
result.show()
+------+--------------+------------------------+----------------------------------+-----------+-------------------+---------------------+
|typeId|total_requests|total_requests_with_item|total_requests_with_item_flag_true|total_items|total_flagged_items|total_events_on_items|
+------+--------------+------------------------+----------------------------------+-----------+-------------------+---------------------+
| 1| 2| 2| 2| 6| 2| 2|
| 2| 2| 1| 0| 0| 0| 1|
| 3| 1| 0| -1| -1| -1| NULL|
+------+--------------+------------------------+----------------------------------+-----------+-------------------+---------------------+