使用元组中的随机密钥创建 Pyspark RDD

问题描述 投票:0回答:1

我正在研究 Apache Spark,发现了一些有趣的事情。当我使用键值对创建新的 rdd 时,其中键是从元组中随机选择的 - reducebykey 的结果不正确。

from pyspark.sql import SparkSession
import random
spark:SparkSession = SparkSession.builder.master("local[1]").appName("SparkNew").getOrCreate()
data = [1,2,3,4,5,6,7,8,9,10]
rdd = spark.sparkContext.parallelize(data)
indexes = ('a', 'b', 'c')
rdd2 = rdd.map(lambda x:(indexes[(random.randint(0,2))], 1))
rdd2.take(10)

例如,创建 rdd2 后,我得到了这个

[('c', 1),
 ('a', 1),
 ('b', 1),
 ('a', 1),
 ('a', 1),
 ('c', 1),
 ('a', 1),
 ('a', 1),
 ('c', 1),
 ('a', 1)]

在reduceByKey之后我得到了这个

[('c', 5), ('a', 2), ('b', 3)] 

这显然是不正确的。任何人都知道为什么会发生这种情况?是因为兰特吗?但为什么? 感谢您的帮助!

apache-spark pyspark rdd
1个回答
0
投票

如果您正在寻找一种在给定范围内生成随机值的方法,您可以使用均匀随机分布函数 F.rand(),然后按如下所示的范围对其进行缩放。

from pyspark import SQLContext
from pyspark.sql.functions import *
import pyspark.sql.functions as F


sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = list(range(100000))
data1 = [[x] for x in data1]

df1Columns = ["id"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)

df1.show(n=5, truncate=False)

start = 0
end = 2

df2 = df1.withColumn("randInt", F.lit(F.floor(start + F.rand() * ( (end+1) - start) )))

df2.show(n=5, truncate=False)

print("cumulative count")
result = df2.select("randInt").groupBy("randInt").agg(F.count("randInt"))
result.show(n=10, truncate=False)

输出:

+---+
|id |
+---+
|0  |
|1  |
|2  |
|3  |
|4  |
+---+
only showing top 5 rows

+---+-------+
|id |randInt|
+---+-------+
|0  |1      |
|1  |1      |
|2  |1      |
|3  |0      |
|4  |1      |
+---+-------+
only showing top 5 rows

cumulative count
+-------+--------------+
|randInt|count(randInt)|
+-------+--------------+
|0      |33440         |
|1      |33143         |
|2      |33417         |
+-------+--------------+
© www.soinside.com 2019 - 2024. All rights reserved.