使用Spark优化多个JDBC查询

问题描述 投票:1回答:1

我试图使用Spark从Greenplum数据库中提取增量数据。我们为每个表提供了增量数据,其中包含一个名为transactionId的密钥。每个transactionId可以包含一行或多行的数据。所有这些都存储在元数据表中:incKeyTable。我们还有另一个元数据表中每个表的最后移动的transactionIDincKeyLoads。此表包含每个表的一个条目,这是最后更新的transactionId到生产表中。为了找出每个表的增量transactionid,我提出了以下逻辑。

val spark = SparkSession.builder().master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
import spark.implicits._
Class.forName("org.postgresql.Driver").newInstance()
val tableStatus = s"select tablename, last_update_transaction_id from prod.incKeyLoads where source_system='DB2' and tablename='table1' and final_stage='PROD' and load='Successfull'"
val tableMetaDF = spark.read.format("jdbc").option("url", "url").option("dbtable", s"(${tableStatus}) as LoadedData").option("user", "user").option("password", "pwd").load()
val lutransIdTableMap   = tableMetaDF.map(r => (r.getString(0),r.getLong(1))).collect().toMap

现在我在scala Map中有我最后更新的事务ID,如下所示:

lutransIdTableMap.foreach(println) =
(table1 -> 123)
(table2 -> 113)
(table3 -> 122)
...
(tableN -> 098)

为了找到最新的transactionId(增量数据)来自greenplum,我编写了以下逻辑来查询元数据表:incKeyTable

Class.forName("com.pivotal.jdbc.GreenplumDriver").newInstance()
def sortLogIds(incTransIds:DataFrame, lastMovedTransId:Long, tablename: String):String = {
    val returnMsg = "Full loads on this table"
    val count = incTransIds.where($"load_type" === "FULLLOAD").count
    if(count == 0) {
      incTransIds.createOrReplaceTempView("incTransID")
      val execQuery  = s"SELECT transactionId from incTransID order by transactionId desc"
      val incLogIdDf = spark.sql(execQuery)
      incLogIdDf.show
      val pushTransIds = "select * from schema.tablename where transactionID in(" + "'" + incLogIdDf.select($"transactionId").collect().map(_.getInt(0).toString).mkString("','") + "')"
      pushLogIds
    } else {
      println("Full load count is greater than zero..")
      returnMsg
    }
}

var incTransIdMap = Map[String, String]()
lutransIdTableMap.keys.foreach(keyTable => if(lutransIdTableMap(keyTable) !=0) {
    val tablename = keyTable.split("\\.")   // Tablename = schema.tablename
    val cdf = spark.read.format("jdbc").option("url", "url").option("dbtable", s"(select transactionId, load_type, source_system, tablename from schema.incKeyTable where source_system='DB2' and target_table='${tablename(1)}' and transactionId > ${lutransIdTableMap(keyTable)}) as controlTableDF").option("user", "user").option("password", "pwd").load()
    incTransIdMap += (keyTable -> sortLogIds(cdf, lutransIdTableMap(keyTable), tablename(1)))
    }
)

这种方法正在运行,但由于数据帧cdf是一个巨大的数据帧,因此我需要花费很长时间才能在此搜索完成之前从表级别从greenplum中提取整个数据。我试图缓存数据帧:cdf但是包含近500万行,并建议不要将这么大的表缓存到缓存中。我无法想到其他方式可以让我更快地进行搜索。谁能让我知道一个想法,使这个过程成为一个有效的过程?

sql scala apache-spark jdbc greenplum
1个回答
1
投票

问题中的代码不能是你实际运行的代码,因为你在pushLogIds中返回sortLogIds,它从未被定义过,你从schema.tablename中选择而不是从s"schema.$tablename"中选择。这使得很难确切知道发生了什么......

也就是说,从大数据处理的角度来看,您的方法存在一些潜在的问题:

  1. 迭代而不是UNION转换。在其他条件相同的情况下,不是发出许多单独的查询然后在驱动程序上组合结果,最好考虑发出单个查询的方法。这就是优化器有机会提供帮助的方式。在您的情况下,请考虑创建一个Greenplum视图,该视图组合了lutransIdTableMap中的所有表。
  2. 操作而不是连接转换。在sortLogIds中,您正在执行count操作,以决定是否运行其他查询。在其他条件相同的情况下,最好通过连接转换来表达这一点,以便延迟运行操作。之后你发行了一个show,它的封面相当于take(n)。这个动作真的有必要吗?稍后您使用collect生成要在IN运算符中使用的SQL表达式。这是您应该使用连接的另一个示例。总而言之,您执行三次由incTransId表示的相同Greenplum基本查询。如果你坚持这种类型的处理,你绝对应该以某种方式坚持incTransId
  3. SQL程序集而不是DSL使用。通常,如果您通过编程语言而不是通过SparkSQL使用Spark,则应该使用DSL而不是将SQL表达式组合为字符串。这样,您就不需要重新定义视图等。

如果没有完整的代码并且不知道确切的Greenplum架构+分发策略+索引(如果有的话)以及所涉及的数据大小,那么这里有太多的问题需要解决。但是,上面应该给你一个起点。

以下是如何从使用迭代切换到联合的示例。

val allData = Map("table1" -> 101, "table2" -> 212)
  .map { case (tableName, id) =>
    spark.table(tableName).withColumn("id", lit(id))
  }
  .reduceLeft(_ union _)

这是一个如何使用连接而不是collect + IN的示例。

val allIds = spark.range(100)
val myIds = spark.createDataset(Seq(11, 33, 55, 77, 99)).toDF("id")
allIds.where('id.isin(myIds.as[Int].collect: _*)) // premature action
allIds.join(myIds, Seq("id")) // inner join delays action

上面的例子还展示了如何使用collect数据集,例如用.collect().map(_.getInt(0).toString)替换.as[String].collect,它更简单,更安全,更快捷。

希望这可以帮助!

© www.soinside.com 2019 - 2024. All rights reserved.