如何从PySpark中的单个元素的RDD创建一对RDD?

问题描述 投票:0回答:2

这是实际的管道。我正在向RDD加载文本。然后我把它清理干净。

rdd1 = sc.textFile("sometext.txt")

import re
import string

def Func(lines):
    lines = lines.lower() #make all text lowercase
    lines = re.sub('[%s]' % re.escape(string.punctuation), '', lines) #remove punctuation
    lines = re.sub('\w*\d\w*', '', lines) #remove numeric-containing strings
    lines = lines.split() #split lines
    return lines
rdd2 = rdd1.flatMap(Func)

stopwords = ['list of stopwords goes here'] 
rdd3 = rdd2.filter(lambda x: x not in stopwords) # filter out stopwords
rdd3.take(5) #resulting RDD

Out:['a',
     'b',
     'c',
     'd',
     'e']

我现在需要做的是马尔可夫链函数的开始。我想将每个元素与其连续元素配对,例如:

[('a','b'),('b','c'),('c','d'),('d','e')等...]

python apache-spark pyspark rdd
2个回答
0
投票

我认为您需要在RDD中指定元素的顺序,以确定2个元素如何被视为彼此“连续”。因为您的RDD可以包含多个分区,所以spark不会知道partition_1中的1个元素是否与partition_2中的另一个元素连续。

如果您事先知道了数据,则可以定义密钥以及2个元素是如何“连续”的。根据您从list创建rdd的示例,您可以使用索引作为键并进行连接。

"""you want to shift arr by 1 to the left, then join back to arr. Calculation based on index"""

arr = ['a','b','c','d','e','f']
rdd = sc.parallelize(arr, 2).zipWithIndex().cache() #cache if rdd is small 

original_rdd = rdd.map(lambda x: (x[1], x[0])) #create rdd with key=index, value=item in list

shifted_rdd = rdd.map(lambda x: (x[1]-1, x[0]))

results = original_rdd.join(shifted_rdd)
print(results.values().collect())

为了在join中获得更好的性能,您可以使用original_rddshifted_rdd的范围分区。


-1
投票

漂亮的下降方法。真的可以进行更多优化。

>>> rdd=sc.parallelize(['a','b','c','d','e','f'])
#zipping with Index to rip off odd and even elements, to group consecutive elements in future
>>> rdd_odd=rdd.zipWithIndex().filter(lambda (x,y):y%2!=0).map(lambda (x,y):x).coalesce(1)
>>> rdd_even=rdd.zipWithIndex().filter(lambda (x,y):y%2==0).map(lambda (x,y):x).coalesce(1)
>>> rdd_2=rdd_even.zip(rdd_odd)
>>> rdd_2.collect()
[('a', 'b'), ('c', 'd'), ('e', 'f')]

确保在rdd_1中拥有偶数个元素。这实际上将形成连续元素配对的基础。

© www.soinside.com 2019 - 2024. All rights reserved.