在Spark上更新Oracle表时如何避免ORA-00060(检测到死锁)错误

问题描述 投票:-1回答:1

我的火花作业中有一个奇怪的错误,如果可能,我会使用一些解释。

因此,我的Spark作业从Hive表加载数据,将其转换为Dataframe,然后根据某些列更新一个已经存在的Oracle表。

当数据帧不是很大时,作业运行就没有问题。当数据帧很大时,该作业将运行几个小时,然后因Oracle错误而停止:

exception caught: org.apache.spark.SparkException: Job aborted due to stage failure: Task 104 in stage 43.0 failed 4 times, most recent failure: Lost task 104.3 in stage 43.0 (TID 5937, lxpbda55.ra1.intra.groupama.fr, executor 227): java.sql.BatchUpdateException: ORA-00060: deadlock detected while waiting for resource

这是我的代码的工作方式:

//This is where the error appears
modification(df_Delta_Modif, champs, conditions, cstProp)

//This is its definition
def modification(df: DataFrame, champs: List[String], conditions: List[String], cstProp: java.util.Properties) {
    val url = Parametre_mod.oracleUrl
    val options: JDBCOptions = new JDBCOptions(Map("url" -> url, "dbtable" -> Parametre_mod.targetTableBase, "user" -> Parametre_mod.oracleUser,
      "password" -> Parametre_mod.oraclePassword, "driver" -> "oracle.jdbc.driver.OracleDriver", "batchSize" -> "30000"))
    Crud_mod.modifierbatch(df, options, champs, conditions)
  }

//This is the definition of modifierbatch. It starts with establishing a connection to Oracle.
//Which surely works because I use the same thing on other scripts and it works fine
def modifierbatch(df: DataFrame,
              options : JDBCOptions,
               champs: List[String],
               conditions: List[String]) {
    val url = options.url
    val tables = options.table
    val dialect = JdbcDialects_mod.get(url)
    val nullTypes: Array[Int] = df.schema.fields.map { field =>
      getJdbcType(field.dataType, dialect).jdbcNullType
    }
    val rddSchema = df.schema
    val getConnection: () => Connection = createConnectionFactory(options)
    val batchSize = options.batchSize
    val chainestmt = creerOdreSQLmodificationSimple(champs, conditions, tables) //definition below
    val listChamps: List[Int] = champs.map(rddSchema.fieldIndex):::conditions.map(rddSchema.fieldIndex)
    df.foreachPartition { iterator =>
      //savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
      executePartition(getConnection, tables, iterator, rddSchema, nullTypes, batchSize, chainestmt, listChamps, dialect, 0, "")
    }
  }

//This is the definition of creerOdreSQLmodificationSimple
def creerOdreSQLmodificationSimple(listChamps: List[String], listCondition: List[String], tablecible: String): String = {
    val champs = listChamps.map(_.toUpperCase).mkString(" = ?, ")
    val condition = listCondition.map(_.toUpperCase).mkString(" = ? and ")

    s"""UPDATE ${tablecible} SET ${champs} = ? WHERE ${condition} = ?"""
  }

因此,您可以看到主体不是很复杂。我只是使用批处理来执行Oracle函数(更新)。我不知道是什么原因导致了死锁问题。我在Spark中没有使用任何分区。

[如果需要更多详细信息,请告诉我。谢谢

sql oracle scala apache-spark deadlock
1个回答
0
投票

通过使用df.foreachPartition,似乎正在多个并行连接上完成数据库访问。

如果是这样,则每个分区中必须存在更新相同行的条件。

您的选择是:

  1. 消除重叠部分,这样就不会有两个更新更新同一行。
  2. 如果无法这样做,请安排事情,以确保影响给定行的所有更新都位于同一“分区”中。
  3. 如果无法这样做,请在处理条件值之前对其进行排序。例如,如果您的条件类似于column1 = ? and column2 = ?,并且您设置的值为{(1,'R'),(5,'Q'),(1,'B'),(2,'Z')},对它们进行排序(1,'B')->(1,'R')->(2,'Z')->(5,'Q')。实际上,只要排序顺序是明确的(无关联)并且所有分区都以相同的方式对它们的条件进行排序,则如何排序它们并不重要。
  4. 请勿使用foreachPartition(即,请勿尝试并行运行)。实际上,这只是上面#2的变体。
  5. 按照选项3对工作进行排序将避免死锁,但是您将失去并行运行的很多好处(因为某些分区会阻塞其他分区)。

© www.soinside.com 2019 - 2024. All rights reserved.