我有一个“从节点”和“到节点”的列表,看起来像这样:
1234 4567
1234 6789
1234 3456
4567 9876
….
关键是要找出哪个节点最重要,这意味着哪个节点被一跳和两跳引用最多:1234 with(4567,6789,3456,9876(因为它已连接到4567))]
[我目前所做的只是一个map和reduce函数,以获取出现最多的节点,该节点将覆盖单个节点的引用。但是我需要在A-> B和B-> C为此A-> C的情况下进行介绍。
当前找到前十个节点的代码:
val textFile = sc.textFile("cit-Patents.txt")
val arrayForm = textFile.filter(_.charAt(0)!='#')
val mapreduce = arrayForm.flatMap(line => line.split("\\s+")).map(word => (word,1)).reduceByKey(_ + _).sortBy(_._2,ascending=false).take(10);
我知道graphX也可以帮助解决这个问题,但我不知道该怎么做。
如果您需要更多信息,请告诉我。谢谢。
我认为,根据您的条件,您不需要spark-graphx。您只需将基地DataFrame
与其自身结合即可解决问题,请查看代码:
假设我们有从X到Y的直接链接的DataFrame:
val df = Seq(
(1234, 4567),
(1234, 6789),
(1234, 3456),
(4567, 9876),
(5, 6),
(6, 7),
(6, 8),
(6, 9),
(5, 9),
(6, 10)
).toDF("X", "Y")
我们看到,某些行具有与其他行Y
值相同的X
值,这意味着我们可以根据条件将DataFrame与其自身连接(让我们使用a
和b
别名):a.Y
应该等于b.X
:
import org.apache.spark.sql.functions._
val twoHopCitation = df.as("a").join(
df.as("b"),
col("a.Y") === col("b.X")
)
.select(col("a.X").as("X"), col("b.Y").as("Y"))
现在我们看到从a.X
到b.Y
的所有传递链接:
twoHopCitation.show()
+----+----+
| X| Y|
+----+----+
|1234|9876|
| 5| 10|
| 5| 9|
| 5| 8|
| 5| 7|
+----+----+
所以,我们需要的是将这两个DataFrame合并,并通过X
计数Y
并按count Y
降序排序:
df.union(
twoHopCitation
)
.groupBy("X")
.agg(count(col("Y")).as("cntY"))
.sort(col("cntY").desc)
.show()
+----+----+
| X|cntY|
+----+----+
| 5| 6|
| 6| 4|
|1234| 4|
|4567| 1|
+----+----+