我的rdd如下:
[{'age': 2.18430371791803,
'code': u'"315.320000"',
'id': u'"00008RINR"'},
{'age': 2.80033330216659,
'code': u'"315.320000"',
'id': u'"00008RINR"'},
{'age': 2.8222365762732,
'code': u'"315.320000"',
'id': u'"00008RINR"'},
{...}]
我正在尝试通过使用类似以下代码的最高频率代码将每个id减少到仅1条记录:
rdd.map(lambda x: (x["id"], [(x["age"], x["code"])]))\
.reduceByKey(lambda x, y: x + y)\
.map(lambda x: [i[1] for i in x[1]])\
.map(lambda x: [max(zip((x.count(i) for i in set(x)), set(x)))])
此实现存在一个问题,它没有考虑年龄,因此,例如,如果一个ID包含多个频率为2的代码,则将采用最后一个代码。
为了说明这个问题,请考虑减少的ID:
(u'"000PZ7S2G"',
[(4.3218651186303, u'"388.400000"'),
(4.34924421126357, u'"388.400000"'),
(4.3218651186303, u'"389.900000"'),
(4.34924421126357, u'"389.900000"'),
(13.3667102491139, u'"794.310000"'),
(5.99897016368982, u'"995.300000"'),
(6.02634923989903, u'"995.300000"'),
(4.3218651186303, u'"V72.19"'),
(4.34924421126357, u'"V72.19"'),
(13.3639723398581, u'"V81.2"'),
(13.3667102491139, u'"V81.2"')])
我的代码将输出:
[(2, u'"V81.2"')]
当我想输出它时:
[(2, u'"388.400000"')]
因为这两个代码的频率相同,所以代码388.400000的使用年龄较短,并且显示在最前面。
通过在.reduceByKey()之后添加此行:
.map(lambda x: (x[0], [i for i in x[1] if i[0] == min(x[1])[0]]))
我能够过滤出年龄最小的人,但是我只考虑年龄最小的人,而不是所有代码来计算其频率。在set(x)是x的集合后,[max(zip((x.count(i)for i in set(x)),set(x))))]之后,我无法应用相同/相似的逻辑[1],不考虑年龄。
我应该补充,我不想只使用频率最高的第一个代码,我想使用年龄最小的频率最高的代码,或者如果可能的话,使用出现在最前面的代码,使用只有rdd动作。
SQL中的等效代码如下:
SELECT id, code, MIN(age) AS age, count(*) AS cnt,
ROW_NUMBER() OVER (PARTITION BY id order by count(*) DESC) AS seqnum
FROM tbl
GROUP BY id, code
) da
WHERE seqnum = 1
我非常感谢您的帮助。
如果将rdd转换为数据帧是一种选择,我认为这种方法可以解决您的问题:
from pyspark.sql.functions import row_number, col
from pyspark.sql import Window
df = rdd.toDF()
w = Window.partitionBy('id').orderBy('age')
df = df.withColumn('row_number', row_number.over(w)).where(col('row_number') == 1).drop('row_number')