我有一个spark方法,正在运行flatMap
函数,该函数正在向我返回一个元组列表。元组中的键值是Timestamp
,并且值是dict
。
[(Timestamp('2000-01-01 00:00:00'),
{'id': '1', 'val': '200M', 'date':Timestamp('2000-01-01 00:00:00')}),
(Timestamp('2000-01-01 00:00:00'),
{'id': '2', 'val': '10M', 'date':Timestamp('2000-01-01 00:00:00')}),
(Timestamp('2000-01-01 00:00:00'),
{'id': '3', 'val': '30M', 'date':Timestamp('2000-01-01 00:00:00')}),
(Timestamp('2000-01-02 00:00:00'),
{'id': '15', 'val': '120M', 'date':Timestamp('2000-01-02 00:00:00')}),
(Timestamp('2000-01-02 00:00:00'),
{'id': '3', 'val': '35M', 'date':Timestamp('2000-01-02 00:00:00')}),
(Timestamp('2000-01-02 00:00:00'),
{'id': '4', 'val': '56M', 'date':Timestamp('2000-01-02 00:00:00')}),
(Timestamp('2000-01-03 00:00:00'),
{'id': '6', 'val': '5M', 'date':Timestamp('2000-01-03 00:00:00')}),
(Timestamp('2000-01-03 00:00:00'),
{'id': '1', 'val': '25M', 'date':Timestamp('2000-01-03 00:00:00')}),
(Timestamp('2000-01-03 00:00:00'),
{'id': '2', 'val': '7M', 'date':Timestamp('2000-01-03 00:00:00')}),
我正在尝试运行一个reduceByKey
函数,这会给我:
[ (Timestamp('2000-01-01 00:00:00'),
[{'id': '1', 'val': '200M', 'date':Timestamp('2000-01-01 00:00:00')},
{'id': '2', 'val': '10M', 'date':Timestamp('2000-01-01 00:00:00')},
{'id': '3', 'val': '30M', 'date':Timestamp('2000-01-01 00:00:00')}]),
(Timestamp('2000-01-02 00:00:00'),
[{'id': '15', 'val': '120M', 'date':Timestamp('2000-01-02 00:00:00')},
{'id': '3', 'val': '35M', 'date':Timestamp('2000-01-02 00:00:00')},
{'id': '4', 'val': '56M', 'date':Timestamp('2000-01-02 00:00:00')}]),
(Timestamp('2000-01-03 00:00:00'),
[{'id': '6', 'val': '5M', 'date':Timestamp('2000-01-03 00:00:00')},
{'id': '1', 'val': '25M', 'date':Timestamp('2000-01-03 00:00:00')},
{'id': '2', 'val': '7M', 'date':Timestamp('2000-01-03 00:00:00')}]) ]
到目前为止,我已经尝试过:output = rdd.flatMap(split_func).reduceByKey(lambda x, y: x+y).collect()
但我收到此错误:TypeError: unsupported operand type(s) for +: 'dict' and 'dict'
提前感谢!
这更多是python错误。如果d1
和d2
是词典,则d1 + d2
不起作用。但是,您可以执行{**d1, **d2}
。如果d1和d2具有相同的密钥,它将从d2中获取值。
所以你可以做output = rdd.flatMap(split_func).reduceByKey(lambda x, y: {**x, **y}).collect()
但是,您得到的是元组列表。因此,在这种情况下,我认为groupByKey更好:output = rdd.flatMap(split_func).groupByKey().mapValues(list).collect()