我有一个大的文本文件,其中包含一些Wikimedia项目的页面视图。 (如果您真的很感兴趣,可以找到here。)每行用空格分隔,包含一个Wikimedia页面的统计信息。该架构如下所示:<project code> <page title> <num hits> <page size>
在Scala中,使用Spark RDD或数据框,我希望根据项目代码来计算每个项目的总点击数。因此,例如对于代码为“ zw”的项目,我想找到所有以项目代码“ zw”开头的行,并将它们的总和相加。显然,应同时对所有项目代码执行此操作。
我看过诸如AggregateByKey之类的函数,但是我发现的示例没有足够详细,特别是对于具有4个字段的文件。我认为这是MapReduce的一种工作,但是如何实现它却超出了我的范围。
任何帮助将不胜感激。
首先,您必须以Dataset[String]
格式读取文件。然后,将每个字符串解析为一个元组,以便可以轻松将其转换为Dataframe
。一旦有了Dataframe
,简单的.GroupBy().agg()
就足以完成计算。
import org.apache.spark.sql.functions.sum
val df = spark.read.textFile("/tmp/pagecounts.gz").map(l => {
val a = l.split(" ")
(a(0), a(2).toLong)
}).toDF("project_code", "num_hits")
val agg_df = df.groupBy("project_code")
.agg(sum("num_hits").as("total_hits"))
.orderBy($"total_hits".desc)
agg_df.show(10)
上面的片段按总点击数显示了前10个项目代码。
+------------+----------+
|project_code|total_hits|
+------------+----------+
| en.mw| 5466346|
| en| 5310694|
| es.mw| 695531|
| ja.mw| 611443|
| de.mw| 572119|
| fr.mw| 536978|
| ru.mw| 466742|
| ru| 463437|
| es| 400632|
| it.mw| 400297|
+------------+----------+
当然也可以使用较旧的API作为RDD
映射/减少操作,但是您会丢失Dataset
/Dataframe
api带来的许多优化。