如何在pyspark中找到最高值计数行?

问题描述 投票:0回答:1

我有这个日期框

类型 p类型 时间 长度 垃圾箱
公元前 1 2023-07-02 18:07:28 8283 CB 1
公元前 1 2023-07-12 16:55:45 3402 7.2 3331
公元前 1 2023-07-02 18:07:28 8283 7.2 8209
公元前 1 2023-07-19 21:51:02 9671 3.3 1
公元前 1 2023-07-02 18:07:28 8283 3.3 1
公元前 1 2023-07-08 01:32:12 9642 CB 17
公元前 1 2023-07-19 21:51:02 9671 CB 47
公元前 1 2023-07-19 21:51:02 9671 7.2 9611
公元前 1 2023-07-08 01:32:12 9642 7.2 9613

对于按(类型,ptype,时间,长度)分组的每个组,我想要具有最高 value_count 的 bin 的值

因此预期输出如下

类型 p类型 时间 长度 垃圾箱
公元前 1 2023-07-02 18:07:28 8283 7.2 8209
公元前 1 2023-07-08 01:32:12 9642 7.2 9613
公元前 1 2023-07-12 16:55:45 3402 7.2 3331
公元前 1 2023-07-19 21:51:02 9671 7.2 9611

对于每组列 bin 值 7.2 是最高的。

请帮忙。

我尝试了以下代码,但没有得到正确的结果,它在每次运行时都会产生不同的结果

df.groupBy('type', 'ptype', 'time', 'length', 'bins') \
        .count() \
        .orderBy( 'type', 'ptype', 'time', 'length', desc('count')) \
        .groupBy( 'type', 'ptype', 'time', 'length') \
        .agg(first('bins').alias('bins')).show()
python dataframe apache-spark pyspark
1个回答
0
投票

您可以通过窗口函数和后置过滤简单地实现这一点。

import datetime as dt
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql import Row, Window
from pyspark.sql import functions as f

data = [
    Row(
        type="BC",
        ptype=1,
        time=dt.datetime(2023, 7, 2, 18, 7, 28),
        length=8283,
        bins="CB",
        count=1
    ),
    Row(
        type="BC",
        ptype=1,
        time=dt.datetime(2023, 7, 12, 16, 55, 45),
        length=3402,
        bins="7.2",
        count=3331
    ),
    Row(
        type="BC",
        ptype=1,
        time=dt.datetime(2023, 7, 2, 18, 7, 28),
        length=8283,
        bins="7.2",
        count=8209
    ),
    Row(
        type="BC",
        ptype=1,
        time=dt.datetime(2023, 7, 19, 21, 51, 2),
        length=9671,
        bins="3.3",
        count=1
    ),
    Row(
        type="BC",
        ptype=1,
        time=dt.datetime(2023, 7, 2, 18, 7, 28),
        length=8283,
        bins="3.3",
        count=1
    ),
    Row(
        type="BC",
        ptype=1,
        time=dt.datetime(2023, 7, 8, 1, 32, 12),
        length=9642,
        bins="CB",
        count=17
    ),
    Row(
        type="BC",
        ptype=1,
        time=dt.datetime(2023, 7, 19, 21, 51, 2),
        length=9671,
        bins="CB",
        count=47
    ),
    Row(
        type="BC",
        ptype=1,
        time=dt.datetime(2023, 7, 19, 21, 51, 2),
        length=9671,
        bins="7.2",
        count=9611
    ),
    Row(
        type="BC",
        ptype=1,
        time=dt.datetime(2023, 7, 8, 1, 32, 12),
        length=9642,
        bins="7.2",
        count=9613
    )
]


schema = StructType([
    StructField(name="type", dataType=StringType()),
    StructField(name="ptype", dataType=IntegerType()),
    StructField(name="time", dataType=TimestampType()),
    StructField(name="length", dataType=IntegerType()),
    StructField(name="bin", dataType=StringType()),
    StructField(name="count", dataType=IntegerType()),
])

df = spark.createDataFrame(data=data, schema=schema)

df_new = (
    df
    .withColumn(
        "max_count", 
        f.max("count").over(
            Window.partitionBy("type", "ptype", "time", "length")
        )
    )
    .filter("max_count = count")
    .drop("max_count")
)

df_new.display()



© www.soinside.com 2019 - 2024. All rights reserved.