pyspark中的Rdd乘法?

问题描述 投票:-1回答:1

我有两个数据框,如下面:

数据帧1:(df1)

+---+----------+
|id |features  |
+---+----------+
|8  |[5, 4, 5] |
|9  |[4, 5, 2] |
+---+----------+

数据框2:(df2)

+---+----------+
|id |features  |
+---+----------+
|1  |[1, 2, 3] |
|2  |[4, 5, 6] |
+---+----------+

之后我已经转换成Df到Rdd

rdd1=df1.rdd

如果我正在做rdd1.collect()结果如下

[Row(id=8, f=[5, 4, 5]), Row(id=9, f=[4, 5, 2])]

rdd2=df2.rdd

broadcastedrddif = sc.broadcast(rdd2.collectAsMap())

现在,如果我正在做broadcastedrddif.value

{1: [1, 2, 3], 2: [4, 5, 6]}

现在我想做rdd1和broadcastedrddif的乘法和,即它应该返回如下的输出。

((8,[(1,(5*1+2*4+5*3)),(2,(5*4+4*5+5*6))]),(9,[(1,(4*1+5*2+2*3)),(2,(4*4+5*5+2*6)]) ))

所以我的最终输出应该是

((8,[(1,28),(2,70)]),(9,[(1,20),(2,53)]))

其中(1,28)是元组而不是浮点数。

请帮帮我。

apache-spark pyspark apache-spark-sql spark-dataframe pyspark-sql
1个回答
1
投票

我不明白你为什么使用sc.broadcast()但我还是用它...在这种情况下非常有用mapValues在最后一个RDD上,我使用列表理解来使用字典执行操作。

x1=sc.parallelize([[8,5,4,5], [9,4,5,2]]).map(lambda x: (x[0], (x[1],x[2],x[3])))
x1.collect()
x2=sc.parallelize([[1,1,2,3], [2,4,5,6]]).map(lambda x: (x[0], (x[1],x[2],x[3])))
x2.collect()
#I took immediately an RDD because is more simply to test
broadcastedrddif = sc.broadcast(x2.collectAsMap())
d2=broadcastedrddif.value

def sum_prod(x,y):
    c=0
    for i in range(0,len(x)):
        c+=x[i]*y[i]
    return c
x1.mapValues(lambda x: [(i, sum_prod(list(x),list(d2[i]))) for i in [k for k in d2.keys()]]).collect()
Out[19]: [(8, [(1, 28), (2, 70)]), (9, [(1, 20), (2, 53)])]
© www.soinside.com 2019 - 2024. All rights reserved.