在RDD中添加新列

问题描述 投票:0回答:2

[Error嗨,我正在尝试将一个新列添加到Spark RDD。我正在尝试在一个数据集中尝试添加发行商在所有游戏中所占比例。数据集如下所示:

名称,平台,年份,类型,发布者,NA_Sales,EU_Sales,JP_Sales,Other_Sales

val vgdataLines = sc.textFile("hdfs:///user/ashhall1616/bdc_data/t1/vgsales-small.csv")
val vgdata = vgdataLines.map(_.split(";"))
def toPercentage(x: Double): Double = {x * 100} val countPubl  = vgdata.map(r =>  (r(4),1)).reduceByKey(_+_)
val addpercen = countPubl.withColumn("count", toPercentage($"count"/countPubl.count(_._2)))

我用withColumn()添加了新列'count',并且期望输出如下:

(Ubisoft,3,15.0)

谁能告诉我这是怎么回事?

scala apache-spark-sql rdd
2个回答
0
投票

您不能将withColumn与RDD一起使用,因此将其转换为如下所示的DataFrame,然后使用它

val countPubl : DataFrame  = vgdata.map(r =>  (r(4),1)).reduceByKey(_+_).toDF()

如果您仍想使用RDD,则在将with列添加为后将其转换回RDD

val javaRdd : JavaRDD[Row] = countPubl.withColumn("...",col("...")).toJavaRDD

0
投票

您不能在RDD中使用“ withColumn”。您可以按照以下步骤进行

val addpercen = countPubl.map({case(key, value) => (key, value, toPercentage(value))})

使用映射将计算的值添加为新列,并根据需要转换为DataFrame

import spark.implicits._
val myDf = addpercen.toDF("key","value","myNewColumn") 

myDf.show()

希望有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.