如何最好地将SparkSQL Dataframe Array [String]列转换为新的[String]列

问题描述 投票:0回答:1

我是Spark的新手,我有一个包含4列的Apache SparkSQL DataFrame df,具有以下模式:

root
 |-- _id: string (nullable = false)
 |-- _title: string (nullable = false)
 |-- _published-at: date (nullable = false)
 |-- p: array (nullable = true)
 |    |-- element: string (containsNull = true)

df包含许多(一百万左右)新闻文章,其中列包含,每个记录:唯一ID(_id),标题(_title),发布日期(_published-at),以及每篇文章中文本段落的String数组(p)。

我现在想将“p”列从文章段落的当前格式Array[String]转换为完整文章文本的融合String,其中转换是一个简单的映射,其中段落元素与它们之间的空格(“”)连接在一起,导致新的第五个String列添加到df。即这样的事情:

df.withColumn(df.(col"p").map(_.mkString(" ")).alias("fullarticle"))

这不起作用。然而,这似乎是一个微不足道的问题,但我一定有错。在Spark的functions包中,可以找到许多功能,但似乎没有一个适合这里。我必须以某种方式使用“用户定义函数”(UDF)吗?如果可能的话,最好是避免它。

可以通过以下方式将其转换为String,从而产生新的Dataset[String] dsFullArticles

dsFullArticles = df.select(col("p").as[Array[String]]).map(_.mkString(" ")).alias("fullarticle")

.as[Array[String]]似乎需要解开实际包裹“p”列中每个WrappedArray元素的Array[String])。但是如何将dsFullArticles作为新列添加到df

此后,我还想在“fullarticle”列中找到每篇文章最长单词的长度,并将其作为第六列添加到df

// Split each article in an array of its words
val dsFullArticlesArrayOfWords = dsFullArticles.map(s => s.split(" "))
// Find number of characters of longest word in article, 0 if article is empty
val dsMaxWordLength =
  dsFullArticlesArrayOfWords.map(s => (s.map(w => w.length()) match {
    case x if x.isEmpty => 0  
    case x => x.max
  }))

上面的代码也可以生成Dataset[int],但同样如何将它作为列添加到df?同样的问题在这里。当拥有相同的DataFrame df时,可以很容易地进行各种SQL选择,过滤等。

scala apache-spark-sql apache-spark-dataset
1个回答
1
投票

你可以使用concat_ws函数:

concat_ws(sep,[str | array(str)] +) - 返回由sep分隔的字符串的串联。

在你的情况下:

df.withColumn("fullarticle", concat_ws(" ",col("p")))
© www.soinside.com 2019 - 2024. All rights reserved.