标量中的一跳和两跳引用

问题描述 投票:0回答:1

我有一个“从节点”和“到节点”的列表,看起来像这样:

1234        4567
1234        6789
1234        3456
4567        9876
….

关键是要找出哪个节点最重要,这意味着哪个节点被一跳和两跳引用最多:1234 with(4567,6789,3456,9876(因为它已连接到4567))]

[我目前所做的只是一个map和reduce函数,以获取出现最多的节点,该节点将覆盖单个节点的引用。但是我需要在A-> B和B-> C为此A-> C的情况下进行介绍。

当前找到前十个节点的代码:

val textFile = sc.textFile("cit-Patents.txt")

val arrayForm = textFile.filter(_.charAt(0)!='#')

val mapreduce = arrayForm.flatMap(line => line.split("\\s+")).map(word => (word,1)).reduceByKey(_ + _).sortBy(_._2,ascending=false).take(10);

我知道graphX也可以帮助解决这个问题,但我不知道该怎么做。

如果您需要更多信息,请告诉我。谢谢。

scala spark-graphx
1个回答
0
投票

我认为,根据您的条件,您不需要spark-graphx。您只需将基地DataFrame与其自身结合即可解决问题,请查看代码:

假设我们有从X到Y的直接链接的DataFrame:

val df = Seq(
  (1234, 4567),
  (1234, 6789),
  (1234, 3456),
  (4567, 9876),
  (5, 6),
  (6, 7),
  (6, 8),
  (6, 9),
  (5, 9),
  (6, 10)
).toDF("X", "Y")

我们看到,某些行具有与其他行Y值相同的X值,这意味着我们可以根据条件将DataFrame与其自身连接(让我们使用ab别名):a.Y应该等于b.X

import org.apache.spark.sql.functions._
val twoHopCitation = df.as("a").join(
  df.as("b"), 
  col("a.Y") === col("b.X")
)
  .select(col("a.X").as("X"), col("b.Y").as("Y"))

现在我们看到从a.Xb.Y的所有传递链接:

twoHopCitation.show()
+----+----+
|   X|   Y|
+----+----+
|1234|9876|
|   5|  10|
|   5|   9|
|   5|   8|
|   5|   7|
+----+----+

所以,我们需要的是将这两个DataFrame合并,并通过X计数Y并按count Y降序排序:

df.union(
  twoHopCitation
)
  .groupBy("X")
  .agg(count(col("Y")).as("cntY"))
  .sort(col("cntY").desc)
  .show()
+----+----+
|   X|cntY|
+----+----+
|   5|   6|
|   6|   4|
|1234|   4|
|4567|   1|
+----+----+
© www.soinside.com 2019 - 2024. All rights reserved.