我是Spark的新手,我有一个包含4列的Apache SparkSQL DataFrame df
,具有以下模式:
root
|-- _id: string (nullable = false)
|-- _title: string (nullable = false)
|-- _published-at: date (nullable = false)
|-- p: array (nullable = true)
| |-- element: string (containsNull = true)
df
包含许多(一百万左右)新闻文章,其中列包含,每个记录:唯一ID(_id),标题(_title),发布日期(_published-at),以及每篇文章中文本段落的String数组(p)。
我现在想将“p”列从文章段落的当前格式Array[String]
转换为完整文章文本的融合String
,其中转换是一个简单的映射,其中段落元素与它们之间的空格(“”)连接在一起,导致新的第五个String
列添加到df
。即这样的事情:
df.withColumn(df.(col"p").map(_.mkString(" ")).alias("fullarticle"))
这不起作用。然而,这似乎是一个微不足道的问题,但我一定有错。在Spark的functions
包中,可以找到许多功能,但似乎没有一个适合这里。我必须以某种方式使用“用户定义函数”(UDF)吗?如果可能的话,最好是避免它。
可以通过以下方式将其转换为String
,从而产生新的Dataset[String] dsFullArticles
:
dsFullArticles = df.select(col("p").as[Array[String]]).map(_.mkString(" ")).alias("fullarticle")
(.as[Array[String]]
似乎需要解开实际包裹“p”列中每个WrappedArray
元素的Array[String]
)。但是如何将dsFullArticles
作为新列添加到df
?
此后,我还想在“fullarticle”列中找到每篇文章最长单词的长度,并将其作为第六列添加到df
:
// Split each article in an array of its words
val dsFullArticlesArrayOfWords = dsFullArticles.map(s => s.split(" "))
// Find number of characters of longest word in article, 0 if article is empty
val dsMaxWordLength =
dsFullArticlesArrayOfWords.map(s => (s.map(w => w.length()) match {
case x if x.isEmpty => 0
case x => x.max
}))
上面的代码也可以生成Dataset[int]
,但同样如何将它作为列添加到df
?同样的问题在这里。当拥有相同的DataFrame df
时,可以很容易地进行各种SQL选择,过滤等。
你可以使用concat_ws函数:
concat_ws(sep,[str | array(str)] +) - 返回由sep分隔的字符串的串联。
在你的情况下:
df.withColumn("fullarticle", concat_ws(" ",col("p")))