pyspark rdd以最小的年龄获得最大的频率

问题描述 投票:0回答:1

我的rdd如下:

[{'age': 2.18430371791803,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {'age': 2.80033330216659,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {'age': 2.8222365762732,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {...}]

我正在尝试通过使用类似以下代码的最高频率代码将每个id减少到仅1条记录:

rdd.map(lambda x: (x["id"], [(x["age"], x["code"])]))\
.reduceByKey(lambda x, y: x + y)\
.map(lambda x: [i[1] for i in x[1]])\
.map(lambda x: [max(zip((x.count(i) for i in set(x)), set(x)))])

此实现存在一个问题,它没有考虑年龄,因此,例如,如果一个ID包含多个频率为2的代码,则将采用最后一个代码。

为了说明这个问题,请考虑减少的ID:

(u'"000PZ7S2G"',
 [(4.3218651186303, u'"388.400000"'),
  (4.34924421126357, u'"388.400000"'),
  (4.3218651186303, u'"389.900000"'),
  (4.34924421126357, u'"389.900000"'),
  (13.3667102491139, u'"794.310000"'),
  (5.99897016368982, u'"995.300000"'),
  (6.02634923989903, u'"995.300000"'),
  (4.3218651186303, u'"V72.19"'),
  (4.34924421126357, u'"V72.19"'),
  (13.3639723398581, u'"V81.2"'),
  (13.3667102491139, u'"V81.2"')])

我的代码将输出:

[(2, u'"V81.2"')]

当我想输出它时:

[(2, u'"388.400000"')]

因为这两个代码的频率相同,所以代码388.400000的使用年龄较短,并且显示在最前面。

通过在.reduceByKey()之后添加此行:

.map(lambda x: (x[0], [i for i in x[1] if i[0] == min(x[1])[0]]))

我能够过滤出年龄最小的人,但是我只考虑年龄最小的人,而不是所有代码来计算其频率。在set(x)是x的集合后,[max(zip((x.count(i)for i in set(x)),set(x))))]之后,我无法应用相同/相似的逻辑[1],不考虑年龄。

我应该补充,我不想只使用频率最高的第一个代码,我想使用年龄最小的频率最高的代码,或者如果可能的话,使用出现在最前面的代码,使用只有rdd动作。

SQL中的等效代码如下:

SELECT id, code, MIN(age) AS age, count(*) AS cnt,
             ROW_NUMBER() OVER (PARTITION BY id order by count(*) DESC) AS seqnum
      FROM tbl
      GROUP BY id, code
     ) da
WHERE seqnum = 1

我非常感谢您的帮助。

apache-spark pyspark count rdd reduce
1个回答
0
投票

如果将rdd转换为数据帧是一种选择,我认为这种方法可以解决您的问题:

from pyspark.sql.functions import row_number, col
from pyspark.sql import Window
df = rdd.toDF()
w = Window.partitionBy('id').orderBy('age')
df = df.withColumn('row_number', row_number.over(w)).where(col('row_number') == 1).drop('row_number')
© www.soinside.com 2019 - 2024. All rights reserved.