Spark中的查找表

问题描述 投票:2回答:1

我在Spark中有一个数据框,没有明确定义的模式,我想用作查找表。例如,下面的数据框:

+------------------------------------------------------------------------+
|lookupcolumn                                                            |
+------------------------------------------------------------------------+
|[val1,val2,val3,val4,val5,val6]                                         |
+------------------------------------------------------------------------+

架构看起来像这样:

 |-- lookupcolumn: struct (nullable = true)
 |    |-- key1: string (nullable = true)
 |    |-- key2: string (nullable = true)
 |    |-- key3: string (nullable = true)
 |    |-- key4: string (nullable = true)
 |    |-- key5: string (nullable = true)
 |    |-- key6: string (nullable = true)

我说“架构没有明确定义”,因为在读取数据时键的数量是未知的,所以我将它留给Spark来推断架构。

现在,如果我有另一个带有列的数据框,如下所示:

+-----------------+
|       datacolumn|
+-----------------+
|         key1    |
|         key3    |
|         key5    |
|         key2    |
|         key4    |
+-----------------+

我希望结果如下:

+-----------------+
|     resultcolumn|
+-----------------+
|         val1    |
|         val3    |
|         val5    |
|         val2    |
|         val4    |
+-----------------+

我试过像这样的UDF

val get_val = udf((keyindex: String) => {
    val res = lookupDf.select($"lookupcolumn"(keyindex).alias("result"))
    res.head.toString
})

但它会抛出Null指针异常错误。

有人能告诉我UDF有什么问题吗?如果有更好/更简单的方法在Spark中进行这种查找?

scala apache-spark apache-spark-sql spark-dataframe user-defined-functions
1个回答
0
投票

我假设查找表非常小,在这种情况下,将它收集到驱动程序并将其转换为正常的Map会更有意义。然后在Map函数中使用此UDF。它可以通过多种方式完成,例如:

val values = lookupDf.select("lookupcolumn.*").head.toSeq.map(_.toString)
val keys = lookupDf.select("lookupcolumn.*").columns
val lookup_map = keys.zip(values).toMap

使用上面的lookup_map变量,UDF将简单地:

val lookup = udf((key: String) => lookup_map.get(key))

最终的数据帧可以通过以下方式获得:

val df2 = df.withColumn("resultcolumn", lookup($"datacolumn"))
© www.soinside.com 2019 - 2024. All rights reserved.