Spark UDF 不会在 DataFrame 上计算最终值,但会在测试 DataFrame 上计算最终值

问题描述 投票:0回答:1

我有一个数据框,它是

hive_metastore
中的 Delta 表列表。 对于每个表,我想获取增量日志以提取一些信息。我可以通过收集数组中的 DataFrame 并单独处理每一行(顺序或并行)来做到这一点,但我正在尝试测试使用 UDF 是否可以加快所有或某些部分的速度。

UDF:

import org.apache.spark.sql.functions.udf

val udfGetDeltaLog = udf(
  (catalog: String, database: String, table: String) => {
    val deltaLog = try {
      Some(DeltaLog.forTable(spark, TableIdentifier(table, Some(database), Some(catalog))))
    } catch {
      case e: Throwable => None
    }

    deltaLog match {
      case Some(log) => log.logPath.toString
      case None => null
    }
  }
)

spark.udf.register("udfGetDeltaLog", udfGetDeltaLog)

原始 DataFrame 具有架构和几行(出于合规性原因,我编辑了表的名称):

inputTables.printSchema

root
 |-- catalog_name: string (nullable = true)
 |-- database_name: string (nullable = true)
 |-- table_name: string (nullable = true)

inputTables.limit(3).show(false)

+--------------+-------------+-------------------------------------------+
|catalog_name  |database_name|table_name                                 |
+--------------+-------------+-------------------------------------------+
|hive_metastore|adw_poc      |table1                                     |
|hive_metastore|alfam        |table2                                     |
|hive_metastore|alfam        |table3                                     |
+--------------+-------------+-------------------------------------------+

我创建的测试数据框:

val df = Seq(
  ("hive_metastore", "adw_poc", "table1"), 
  ("hive_metastore", "alfam", "table2"), 
  ("hive_metastore", "alfam", "table3")
).toDF("catalog_name", "database_name", "table_name")

df.printSchema

root
 |-- catalog_name: string (nullable = true)
 |-- database_name: string (nullable = true)
 |-- table_name: string (nullable = true)

当我将 UDF 与测试 DataFrame 一起使用时,我得到:

df
  .withColumn("dl", udfGetDeltaLog($"catalog_name", $"database_name", $"table_name"))
  .show(false)

+-------------------------------------------------------------+
|dl                                                           |
+-------------------------------------------------------------+
|dbfs:/user/hive/warehouse/adw_poc.db/table1/_delta_log       |
|dbfs:/user/hive/warehouse/alfam.db/table2/_delta_log         |
|dbfs:/user/hive/warehouse/alfam.db/table3/_delta_log         |
+-------------------------------------------------------------+

但是当我在原始 DataFrame 上使用 UDF 时,我得到了所有

null

+----+
|dl  |
+----+
|null|
|null|
|null|
+----+

你知道为什么会这样吗? 干杯

scala apache-spark databricks user-defined-functions
1个回答
0
投票

如果您查看测试数据集的计划,您会发现它是 LocalRelation,它们的行为与其他计划不同。顾名思义,它在驱动程序本地运行,在 Databricks 15.x 上,这将导致编译被禁用。

实际行为的关键原因可能是由于您使用了 Spark 变量:

val deltaLog = try {
      Some(DeltaLog.forTable(spark, TableIdentifier(table, Some(database), Some(catalog))))
    } catch {
      case e: Throwable => None
    }

这对于执行者来说将为空。这通常会导致 NPE,但当您捕获 Throwable 时,您最终会得到 None,然后将其映射到输出数据集中看到的 null。

您不能在执行器上使用 SparkSession 或 SQLContext 等。

© www.soinside.com 2019 - 2024. All rights reserved.