如何在 Pyspark RDD 中找到常见的对,无论其顺序如何?

问题描述 投票:0回答:1

我想找出有过联系的那一对。以下为数据:

Input is
K-\> M, H //( this means K send an email to M and H )
M-\> K, E
H-\> F
B-\> T, H
E-\> K, H
F-\> K, H, E
A-\> Z

输出为:

Output:
K, M //(this means K has supplied goods to M and M has also supplied some good to K)
H, F

这是我编写的代码。

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml.regression import LinearRegression
import re
from itertools import combinations

spark = SparkContext("local", "DoubleRDD")


def findpairs(ls):
    lst = []
    for i in range(0,len(ls)-1):
        for j in range(i+1, len(ls)):
            if ls[i] == tuple(reversed(ls[j])):
                lst.append(ls[i])   
    return(lst)

text  = spark.textFile("path to the .txt")
text  = text.map(lambda s: s.replace("->",","))
text  = text.map(lambda s: s.replace(",",""))
text  = text.map(lambda s: s.replace(" ",""))
pairs = text.flatMap(lambda x:  [(x[0],y) for y in x[1:]])
commonpairs = pairs.filter(lambda x: findpairs(x))
pairs.collect()
The output is: \[\]
pyspark rdd
1个回答
0
投票

将文本文件读取为 Spark 数据框

df = spark.read.csv('data.txt', header=False, sep=r'-\\> ').toDF('x', 'y')

# +---+-------+
# |  x|      y|
# +---+-------+
# |  K|   M, H|
# |  M|   K, E|
# |  H|      F|
# |  B|   T, H|
# |  E|   K, H|
# |  F|K, H, E|
# |  A|     Zs|
# +---+-------+

拆分并分解收件人 (y) 列

df1 = df.withColumn('y', F.explode(F.split('y', r',\s+')))

# +---+---+
# |  x|  y|
# +---+---+
# |  K|  M|
# |  K|  H|
# |  M|  K|
# |  M|  E|
# |  H|  F|
# |  B|  T|
# |  B|  H|
# |  E|  K|
# |  E|  H|
# |  F|  K|
# |  F|  H|
# |  F|  E|
# |  A| Zs|
# +---+---+

自加入数据帧,其中左侧的收件人是右侧数据帧的发送者。然后过滤数据框,使左侧的发件人与右侧的收件人相同

df1 = df1.alias('left').join(df1.alias('right'), on=F.expr("left.y == right.x"))
df1 = df1.filter("left.x == right.y")

# +---+---+---+---+
# |  x|  y|  x|  y|
# +---+---+---+---+
# |  K|  M|  M|  K|
# |  M|  K|  K|  M|
# |  H|  F|  F|  H|
# |  F|  H|  H|  F|
# +---+---+---+---+

删除重复的发件人和收件人组合

df1 = df1.select('left.*').withColumn('pairs', F.array_sort(F.array('x', 'y')))
df1 = df1.dropDuplicates(['pairs']).drop('pairs')

# +---+---+
# |  x|  y|
# +---+---+
# |  H|  F|
# |  K|  M|
# +---+---+
© www.soinside.com 2019 - 2024. All rights reserved.