我有一个数据框,它是
hive_metastore
中的 Delta 表列表。
对于每个表,我想获取增量日志以提取一些信息。我可以通过收集数组中的 DataFrame 并单独处理每一行(顺序或并行)来做到这一点,但我正在尝试测试使用 UDF 是否可以加快所有或某些部分的速度。
UDF:
import org.apache.spark.sql.functions.udf
val udfGetDeltaLog = udf(
(catalog: String, database: String, table: String) => {
val deltaLog = try {
Some(DeltaLog.forTable(spark, TableIdentifier(table, Some(database), Some(catalog))))
} catch {
case e: Throwable => None
}
deltaLog match {
case Some(log) => log.logPath.toString
case None => null
}
}
)
spark.udf.register("udfGetDeltaLog", udfGetDeltaLog)
原始 DataFrame 具有架构和几行(出于合规性原因,我编辑了表的名称):
inputTables.printSchema
root
|-- catalog_name: string (nullable = true)
|-- database_name: string (nullable = true)
|-- table_name: string (nullable = true)
inputTables.limit(3).show(false)
+--------------+-------------+-------------------------------------------+
|catalog_name |database_name|table_name |
+--------------+-------------+-------------------------------------------+
|hive_metastore|adw_poc |table1 |
|hive_metastore|alfam |table2 |
|hive_metastore|alfam |table3 |
+--------------+-------------+-------------------------------------------+
我创建的测试数据框:
val df = Seq(
("hive_metastore", "adw_poc", "table1"),
("hive_metastore", "alfam", "table2"),
("hive_metastore", "alfam", "table3")
).toDF("catalog_name", "database_name", "table_name")
df.printSchema
root
|-- catalog_name: string (nullable = true)
|-- database_name: string (nullable = true)
|-- table_name: string (nullable = true)
当我将 UDF 与测试 DataFrame 一起使用时,我得到:
df
.withColumn("dl", udfGetDeltaLog($"catalog_name", $"database_name", $"table_name"))
.show(false)
+-------------------------------------------------------------+
|dl |
+-------------------------------------------------------------+
|dbfs:/user/hive/warehouse/adw_poc.db/table1/_delta_log |
|dbfs:/user/hive/warehouse/alfam.db/table2/_delta_log |
|dbfs:/user/hive/warehouse/alfam.db/table3/_delta_log |
+-------------------------------------------------------------+
但是当我在原始 DataFrame 上使用 UDF 时,我得到了所有
null
值
+----+
|dl |
+----+
|null|
|null|
|null|
+----+
你知道为什么会这样吗? 干杯
如果您查看测试数据集的计划,您会发现它是 LocalRelation,它们的行为与其他计划不同。顾名思义,它在驱动程序本地运行,在 Databricks 15.x 上,这将导致编译被禁用。
实际行为的关键原因可能是由于您使用了 Spark 变量:
val deltaLog = try {
Some(DeltaLog.forTable(spark, TableIdentifier(table, Some(database), Some(catalog))))
} catch {
case e: Throwable => None
}
这对于执行者来说将为空。这通常会导致 NPE,但当您捕获 Throwable 时,您最终会得到 None,然后将其映射到输出数据集中看到的 null。
您不能在执行器上使用 SparkSession 或 SQLContext 等。