如何在pyspark中使用reduceByKey获取多键和单值[重复]

问题描述 投票:0回答:1

我在Ubuntu上使用jupyter。

所以我有下一个问题,这是我的代码:

from pyspark import SparkContext
 sc = SparkContext.getOrCreate()
 ut = sc.textFile("hdfs://localhost:54310/hduser/firstnames")
 rows= ut.map(lambda line: line.split(";"))
 res = rows.filter(lamda row: row[2] >= "2000" and row[2] <= "2004")
 res = res.map(lambda row: ({row[1],row[2]},int(row[3])))

输出:

[({'2001', 'Brussel'}, 9),
 ({'2001', 'Brussel'}, 104),
 ({'2001', 'Vlaanderen'}, 16),
 ({'2002', 'Brussel'}, 12), ...]

我需要输出如下:

[({'2001', 'Brussel'}, 113),
 ({'2001', 'Vlaanderen'}, 16),
 ({'2002', 'Brussel'}, 12)]

我之前尝试过reduceByKey的一些事情,并且看过很多关于reduceByKey的问题,但是无法弄明白。提前致谢。

pyspark
1个回答
0
投票

正如A list as a key for PySpark's reduceByKeyzero323中所解释的,密钥必须实现哈希方法。你可以使用tuples

>>> from operator import add
... 
... sc.parallelize([
...     (('2001', 'Brussel'), 9), (('2001', 'Brussel'), 104),
...     (('2001', 'Vlaanderen'), 16), (('2002', 'Brussel'), 12)
... ]).reduceByKey(add).take(2)
... 
[(('2002', 'Brussel'), 12), (('2001', 'Brussel'), 113)]

更换:

res.map(lambda row: ({row[1],row[2]},int(row[3])))

res.map(lambda row: ((row[1], row[2]), int(row[3])))

或用set替换frozenset

>>> sc.parallelize([
...     (frozenset(['2001', 'Brussel']), 9), (frozenset(['2001', 'Brussel']), 104),
...     (frozenset(['2001', 'Vlaanderen']), 16), (frozenset(['2002', 'Brussel']), 12)
... ]).reduceByKey(add).take(2)

[(frozenset({'2002', 'Brussel'}), 12), (frozenset({'2001', 'Brussel'}), 113)]
© www.soinside.com 2019 - 2024. All rights reserved.