我有这个日期框
类型 | p类型 | 时间 | 长度 | 垃圾箱 | 数 |
---|---|---|---|---|---|
公元前 | 1 | 2023-07-02 18:07:28 | 8283 | CB | 1 |
公元前 | 1 | 2023-07-12 16:55:45 | 3402 | 7.2 | 3331 |
公元前 | 1 | 2023-07-02 18:07:28 | 8283 | 7.2 | 8209 |
公元前 | 1 | 2023-07-19 21:51:02 | 9671 | 3.3 | 1 |
公元前 | 1 | 2023-07-02 18:07:28 | 8283 | 3.3 | 1 |
公元前 | 1 | 2023-07-08 01:32:12 | 9642 | CB | 17 |
公元前 | 1 | 2023-07-19 21:51:02 | 9671 | CB | 47 |
公元前 | 1 | 2023-07-19 21:51:02 | 9671 | 7.2 | 9611 |
公元前 | 1 | 2023-07-08 01:32:12 | 9642 | 7.2 | 9613 |
对于按(类型,ptype,时间,长度)分组的每个组,我想要具有最高 value_count 的 bin 的值
因此预期输出如下
类型 | p类型 | 时间 | 长度 | 垃圾箱 | 数 |
---|---|---|---|---|---|
公元前 | 1 | 2023-07-02 18:07:28 | 8283 | 7.2 | 8209 |
公元前 | 1 | 2023-07-08 01:32:12 | 9642 | 7.2 | 9613 |
公元前 | 1 | 2023-07-12 16:55:45 | 3402 | 7.2 | 3331 |
公元前 | 1 | 2023-07-19 21:51:02 | 9671 | 7.2 | 9611 |
对于每组列 bin 值 7.2 是最高的。
请帮忙。
我尝试了以下代码,但没有得到正确的结果,它在每次运行时都会产生不同的结果
df.groupBy('type', 'ptype', 'time', 'length', 'bins') \
.count() \
.orderBy( 'type', 'ptype', 'time', 'length', desc('count')) \
.groupBy( 'type', 'ptype', 'time', 'length') \
.agg(first('bins').alias('bins')).show()
您可以通过窗口函数和后置过滤简单地实现这一点。
import datetime as dt
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql import Row, Window
from pyspark.sql import functions as f
data = [
Row(
type="BC",
ptype=1,
time=dt.datetime(2023, 7, 2, 18, 7, 28),
length=8283,
bins="CB",
count=1
),
Row(
type="BC",
ptype=1,
time=dt.datetime(2023, 7, 12, 16, 55, 45),
length=3402,
bins="7.2",
count=3331
),
Row(
type="BC",
ptype=1,
time=dt.datetime(2023, 7, 2, 18, 7, 28),
length=8283,
bins="7.2",
count=8209
),
Row(
type="BC",
ptype=1,
time=dt.datetime(2023, 7, 19, 21, 51, 2),
length=9671,
bins="3.3",
count=1
),
Row(
type="BC",
ptype=1,
time=dt.datetime(2023, 7, 2, 18, 7, 28),
length=8283,
bins="3.3",
count=1
),
Row(
type="BC",
ptype=1,
time=dt.datetime(2023, 7, 8, 1, 32, 12),
length=9642,
bins="CB",
count=17
),
Row(
type="BC",
ptype=1,
time=dt.datetime(2023, 7, 19, 21, 51, 2),
length=9671,
bins="CB",
count=47
),
Row(
type="BC",
ptype=1,
time=dt.datetime(2023, 7, 19, 21, 51, 2),
length=9671,
bins="7.2",
count=9611
),
Row(
type="BC",
ptype=1,
time=dt.datetime(2023, 7, 8, 1, 32, 12),
length=9642,
bins="7.2",
count=9613
)
]
schema = StructType([
StructField(name="type", dataType=StringType()),
StructField(name="ptype", dataType=IntegerType()),
StructField(name="time", dataType=TimestampType()),
StructField(name="length", dataType=IntegerType()),
StructField(name="bin", dataType=StringType()),
StructField(name="count", dataType=IntegerType()),
])
df = spark.createDataFrame(data=data, schema=schema)
df_new = (
df
.withColumn(
"max_count",
f.max("count").over(
Window.partitionBy("type", "ptype", "time", "length")
)
)
.filter("max_count = count")
.drop("max_count")
)
df_new.display()