我使用scala API为我们发送地图的每个会话都有一个mapType in spark中的问题,在这个地图中你可以找到用户访问的类别与每个类别中的事件数量相关联
[ home & personal items > interior -> 1, vehicles > cars -> 1]
并非所有用户都访问相同数量的类别,因此地图的大小会根据user_id发生变化
我需要计算按类别分组的会话数量,以便我需要在地图上循环,而不是它不是空的我以前尝试过的东西
while (size(col("categoriesRaw")) !== 0) {
df.select(
explode(col("categoriesRaw"))
)
.select(
col("key").alias("categ"),
col("value").alias("number_of_events")
)
}
但我面临一些错误,如:
type mismatch;
found : org.apache.spark.sql.Column
required: Booleansbt
我不确定你要用while循环做什么。无论如何,您可以使用REPL检查您用作条件的表达式是Column
而不是Boolean
,因此异常。
> size(col("categoriesRaw")) !== 0
res1: org.apache.spark.sql.Column = (NOT (size(categoriesRaw) = 0))
基本上,这是一个表达式,需要在where
,select
或任何其他使用Columns的函数中由SparkSQL进行评估。
然而,你的火花代码几乎就在那里,你只需要添加一个groupBy
就可以到达你想要的地方。让我们从创建数据开始。
import spark.implicits._
val users = Seq( "user 1" -> Map("home & personal items > interior" -> 1,
"vehicles > cars" -> 1),
"user 2" -> Map("vehicles > cars" -> 3))
val df = users.toDF("user", "categoriesRaw")
然后,您不需要while循环来迭代映射的所有值。 explode
为您做到了这一点:
val explodedDf = df.select( explode('categoriesRaw) )
explodedDf.show(false)
+--------------------------------+-----+
|key |value|
+--------------------------------+-----+
|home & personal items > interior|1 |
|vehicles > cars |1 |
|vehicles > cars |3 |
+--------------------------------+-----+
最后,你可以使用groupBy add get你想要的。
explodedDf
.select('key as "categ", 'value as "number_of_events")
.groupBy("categ")
.agg(count('*), sum('number_of_events))
.show(false)
+--------------------------------+--------+---------------------+
|categ |count(1)|sum(number_of_events)|
+--------------------------------+--------+---------------------+
|home & personal items > interior|1 |1 |
|vehicles > cars |2 |4 |
+--------------------------------+--------+---------------------+
注意:我不确定你是否想要计算会话(第1列)或事件(第2列),所以我计算了两者。