我们希望从
hive_metastore
中的表格中收集各种信息,以生成用于未来优化的指标。
hive_metastore
包含数百个模式和总共约 150K 个表。
是否有一种有效/可并行的方法,例如为每个模式运行
spark.sql("SHOW TABLES in {schema}).collect()
,然后为每个表运行 spark.sql("DESCRIBE EXTENDED {table_name}").collect()
之类的命令?
这个问题最好是与 PySpark 一起使用,但如果 Python API 无法实现,那么我们可以使用 Scala。
请告诉我这是否足以说明此案的信息。
提前非常感谢!
下面的代码是scala中的,您可以根据集群大小增加或减少
numThreads
。
import scala.collection.parallel.ForkJoinTaskSupport
import java.util.concurrent.ForkJoinPool
import org.apache.spark.sql._
val numThreads = 100 // Number of threads to execute query in parallel.
val schema = "" // Schema name
val showTables = s"SHOW TABLES FROM ${schema}"
val tableList = spark
.sql(showTables)
.selectExpr("CONCAT_WS('.', namespace, tableName) AS table")
.distinct
.as[String]
.collect
val tableListPar = tableList.par
val taskSupport = new ForkJoinTaskSupport(new ForkJoinPool(numThreads))
tableListPar.tasksupport = taskSupport
def describe(table: String): DataFrame = {
val desc = s"DESCRIBE EXTENDED ${table}"
println(s"Executing -> `${desc}`")
spark.sql(desc)
.withColumn("table", lit(table))
.select(
$"table",
$"col_name",
$"data_type",
$"comment"
)
}
display(
tableListPar.map(describe)
.reduce(_.union(_))
)