我想找出有过联系的那一对。以下为数据:
Input is
K-\> M, H //( this means K send an email to M and H )
M-\> K, E
H-\> F
B-\> T, H
E-\> K, H
F-\> K, H, E
A-\> Z
输出为:
Output:
K, M //(this means K has supplied goods to M and M has also supplied some good to K)
H, F
这是我编写的代码。
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml.regression import LinearRegression
import re
from itertools import combinations
spark = SparkContext("local", "DoubleRDD")
def findpairs(ls):
lst = []
for i in range(0,len(ls)-1):
for j in range(i+1, len(ls)):
if ls[i] == tuple(reversed(ls[j])):
lst.append(ls[i])
return(lst)
text = spark.textFile("path to the .txt")
text = text.map(lambda s: s.replace("->",","))
text = text.map(lambda s: s.replace(",",""))
text = text.map(lambda s: s.replace(" ",""))
pairs = text.flatMap(lambda x: [(x[0],y) for y in x[1:]])
commonpairs = pairs.filter(lambda x: findpairs(x))
pairs.collect()
The output is: \[\]
将文本文件读取为 Spark 数据框
df = spark.read.csv('data.txt', header=False, sep=r'-\\> ').toDF('x', 'y')
# +---+-------+
# | x| y|
# +---+-------+
# | K| M, H|
# | M| K, E|
# | H| F|
# | B| T, H|
# | E| K, H|
# | F|K, H, E|
# | A| Zs|
# +---+-------+
拆分并分解收件人 (y) 列
df1 = df.withColumn('y', F.explode(F.split('y', r',\s+')))
# +---+---+
# | x| y|
# +---+---+
# | K| M|
# | K| H|
# | M| K|
# | M| E|
# | H| F|
# | B| T|
# | B| H|
# | E| K|
# | E| H|
# | F| K|
# | F| H|
# | F| E|
# | A| Zs|
# +---+---+
自加入数据帧,其中左侧的收件人是右侧数据帧的发送者。然后过滤数据框,使左侧的发件人与右侧的收件人相同
df1 = df1.alias('left').join(df1.alias('right'), on=F.expr("left.y == right.x"))
df1 = df1.filter("left.x == right.y")
# +---+---+---+---+
# | x| y| x| y|
# +---+---+---+---+
# | K| M| M| K|
# | M| K| K| M|
# | H| F| F| H|
# | F| H| H| F|
# +---+---+---+---+
删除重复的发件人和收件人组合
df1 = df1.select('left.*').withColumn('pairs', F.array_sort(F.array('x', 'y')))
df1 = df1.dropDuplicates(['pairs']).drop('pairs')
# +---+---+
# | x| y|
# +---+---+
# | H| F|
# | K| M|
# +---+---+