[Error嗨,我正在尝试将一个新列添加到Spark RDD。我正在尝试在一个数据集中尝试添加发行商在所有游戏中所占比例。数据集如下所示:
名称,平台,年份,类型,发布者,NA_Sales,EU_Sales,JP_Sales,Other_Sales
val vgdataLines = sc.textFile("hdfs:///user/ashhall1616/bdc_data/t1/vgsales-small.csv")
val vgdata = vgdataLines.map(_.split(";"))
def toPercentage(x: Double): Double = {x * 100} val countPubl = vgdata.map(r => (r(4),1)).reduceByKey(_+_)
val addpercen = countPubl.withColumn("count", toPercentage($"count"/countPubl.count(_._2)))
我用withColumn()
添加了新列'count',并且期望输出如下:
(Ubisoft,3,15.0)
谁能告诉我这是怎么回事?
您不能将withColumn
与RDD一起使用,因此将其转换为如下所示的DataFrame,然后使用它
val countPubl : DataFrame = vgdata.map(r => (r(4),1)).reduceByKey(_+_).toDF()
如果您仍想使用RDD,则在将with列添加为后将其转换回RDD
val javaRdd : JavaRDD[Row] = countPubl.withColumn("...",col("...")).toJavaRDD
您不能在RDD中使用“ withColumn”。您可以按照以下步骤进行
val addpercen = countPubl.map({case(key, value) => (key, value, toPercentage(value))})
使用映射将计算的值添加为新列,并根据需要转换为DataFrame
import spark.implicits._
val myDf = addpercen.toDF("key","value","myNewColumn")
myDf.show()
希望有帮助。