使用jdbc preparestatement和foreachPartition的Scala Upserts

问题描述 投票:0回答:1

我是scala / java编程的新手

我需要创建一个scala jdbcTemplate来将多列映射到PostgreSQL数据库的SQL查询。

我的插入查询大约有80列。

类似的东西:

INSERT into schema.table_one (cov_eff_dt,cov_canc_dt,cov_pd_thru_dt,ebill_dt,retro_elig_recv_dt,retro_orig_cov_eff_dt,retro_orig_cov_canc_dt,cobra_eff_dt,elig_grc_prd_thru_dt,lst_prem_pd_dt,pol_ren_dt,partn_nbr,xref_id_partn_nbr,cnsm_id,prfl_id,src_cdb_xref_id,cos_pnl_nbr,src_tmstmp,row_tmstmp,created_dttm,updt_dttm,src_cd,lgcy_pol_nbr,lgcy_src_id,cov_typ_cd,cos_div_cd,mkt_typ_cd,cos_grp_nbr,lgcy_prdt_typ_cd,lgcy_prdt_cd,cov_lvl_typ_cd,shr_arng_cd,shr_arng_oblig_cd,lgcy_pln_var_cd,lgcy_rpt_cd,prdt_srvc_typ_cd,ee_sts_typ_cd,govt_pgm_typ_cd,clm_sys_typ_cd,elig_sys_typ_cd,ces_grp_nbr,mkt_site_cd,row_sts_cd,medica_trvlben_ind,row_user_id,sec_typ_cd,cancel_rsn_typ_cd,cov_pd_thru_rsn_cd,list_bill_typ_cd,billing_sufx_cd,billing_subgrp_nbr,retro_days,retro_typ_cd,retro_ovrd_typ_cd,tops_cov_lvl_typ_cd,lgcy_ben_pln_id,lgcy_prdt_id,rr_ben_grp_nbr,rr_ben_grp_cho_cd,rr_br_cd,rr_un_cd,rr_optout_plan_ind,updt_typ_cd,racf_id,prr_cov_mo,fund_typ_cd,state_of_issue_cd,cobra_mo,cobra_qual_evnt_cd,grndfathered_pol_ind,deriv_cov_ind,cnsm_lgl_enty_nm,indv_grp_typ_cd,src_cov_mnt_typ_cd,pbp_cd,h_cntrct_id,risk_typ_cd,bil_typ_cd,rate_cov_typ_cd,plan_cd,seg_id,src_sys_id) VALUES ( ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ON CONFLICT (cov_eff_dt,xref_id_partn_nbr,src_cdb_xref_id,src_cd,lgcy_pol_nbr,lgcy_src_id,cov_typ_cd) 
DO  UPDATE SET cov_canc_dt= ?,cov_pd_thru_dt= ?,ebill_dt= ?,retro_elig_recv_dt= ?,retro_orig_cov_eff_dt= ?,retro_orig_cov_canc_dt= ?,cobra_eff_dt= ?,elig_grc_prd_thru_dt= ?,lst_prem_pd_dt= ?,pol_ren_dt= ?,partn_nbr= ?,prfl_id= ?,cnsm_id= ?,cos_pnl_nbr= ?,src_tmstmp= ?,row_tmstmp= ?,updt_dttm= ?,cos_div_cd= ?,mkt_typ_cd= ?,cos_grp_nbr= ?,lgcy_prdt_typ_cd= ?,lgcy_prdt_cd= ?,cov_lvl_typ_cd= ?,shr_arng_cd= ?,shr_arng_oblig_cd= ?,lgcy_pln_var_cd= ?,lgcy_rpt_cd= ?,prdt_srvc_typ_cd= ?,ee_sts_typ_cd= ?,govt_pgm_typ_cd= ?,clm_sys_typ_cd= ?,elig_sys_typ_cd= ?,ces_grp_nbr= ?,mkt_site_cd= ?,row_sts_cd= ?,medica_trvlben_ind= ?,row_user_id= ?,sec_typ_cd= ?,cancel_rsn_typ_cd= ?,cov_pd_thru_rsn_cd= ?,list_bill_typ_cd= ?,billing_sufx_cd= ?,billing_subgrp_nbr= ?,retro_days= ?,retro_typ_cd= ?,retro_ovrd_typ_cd= ?,tops_cov_lvl_typ_cd= ?,lgcy_ben_pln_id= ?,lgcy_prdt_id= ?,rr_ben_grp_nbr= ?,rr_ben_grp_cho_cd= ?,rr_br_cd= ?,rr_un_cd= ?,rr_optout_plan_ind= ?,updt_typ_cd= ?,racf_id= ?,prr_cov_mo= ?,fund_typ_cd= ?,state_of_issue_cd= ?,cobra_mo= ?,cobra_qual_evnt_cd= ?,grndfathered_pol_ind= ?,deriv_cov_ind= ?,cnsm_lgl_enty_nm= ?,indv_grp_typ_cd= ?,src_cov_mnt_typ_cd= ?,pbp_cd= ?,h_cntrct_id= ?,risk_typ_cd= ?,bil_typ_cd= ?,rate_cov_typ_cd= ?,plan_cd= ?,seg_id= ?,src_sys_id= ?

需要放在“?”中的数据占位符存储在另一个名为inputdatafiledfwindow的数据框中。

列的映射,即准备语句中用于设置值的函数将动态生成。

          val inputdatafiledfwindow = inputdatafiledf.select("*").withColumn("rank",row_number().over(windowSpec)).where("rank = 1").drop("rank")

inputdatafiledfwindow.coalesce(10).foreachPartition(partition => {
      val dbc: Connection = DriverManager.getConnection(jdbcConnectionString, user.toString, password.toString)
      val st = dbc.prepareStatement(updatequeryforInsertAndUpdate)
      partition.grouped(50).foreach(batch => {
        batch.foreach { row => {
                st.setShort(1, row.getShort(0))
                st.setInt(2, row.getInt(1))
                st.setString(3, row.getString(2).replaceAll("\\000", ""))
                st.setString(4, row.getString(3).replaceAll("\\000", ""))
                st.setString(5, row.getString(4).replaceAll("\\000", ""))
                st.setString(6, row.getString(5).replaceAll("\\000", ""))
                st.setDate(7, row.getDate(6))
                st.setDate(8, row.getDate(7))
                st.setString(9, row.getString(8).replaceAll("\\000", ""))
                st.setString(10, row.getString(9).replaceAll("\\000", ""))
                st.setString(11, row.getString(10).replaceAll("\\000", ""))
                st.setString(12, row.getString(11).replaceAll("\\000", ""))
                st.setString(13, row.getString(12).replaceAll("\\000", ""))
                st.setString(14, row.getString(13).replaceAll("\\000", ""))
                st.setString(15, row.getString(14).replaceAll("\\000", ""))
                st.setString(16, row.getString(15).replaceAll("\\000", ""))
                st.setString(17, row.getString(16).replaceAll("\\000", ""))
                st.setString(18, row.getString(17).replaceAll("\\000", ""))
                st.setString(19, row.getString(18).replaceAll("\\000", ""))
                st.setString(20, row.getString(19).replaceAll("\\000", ""))
                st.setString(21, row.getString(20).replaceAll("\\000", ""))
                st.setString(22, row.getString(21).replaceAll("\\000", ""))
                st.setString(23, row.getString(22).replaceAll("\\000", ""))
                st.setString(24, row.getString(23).replaceAll("\\000", ""))
                st.setString(25, row.getString(24).replaceAll("\\000", ""))
                st.setString(26, row.getString(25).replaceAll("\\000", ""))
          }

          st.addBatch()
        }
        }
        st.executeBatch()
      })
      dbc.close()
    })

当前,我正在尝试类似的事情:

val columnNames: scala.Array[String] = inputdatafiledfwindow.columns
    val columnDataTypes: scala.Array[String] = inputdatafiledfwindow.schema.fields.map(x=>x.dataType).map(x=>x.toString)

inputdatafiledfwindow.coalesce(10).foreachPartition(partition  => {
      val columnNames_br = sc.broadcast(inputdatafiledfwindow.columns)
      val columnDataTypes_br = sc.broadcast(inputdatafiledfwindow.schema.fields.map(x=>x.dataType).map(x=>x.toString))
      val dbc: Connection = DriverManager.getConnection(jdbcConnectionString, user.toString, password.toString)
      val st  = dbc.prepareStatement(updatequeryforInsertAndUpdate)
partition.grouped(50).foreach(batch => {
        batch.foreach { row => {
          for (i<-0 to columnNames.length-1) {
            if (columnDataTypes(i) == "ShortType")
              st.setShort((i+1).toInt, row.getShort(i))
            else if(columnDataTypes(i)== "IntegerType")
              st.setInt((i+1).toInt,row.getInt(i))
            else if (columnDataTypes(i)=="StringType")
              st.setString((i+1).toInt,row.getString(i))
            else if(columnDataTypes(i)=="TimestampType")
              st.setTimestamp((i+1).toInt, row.getTimestamp(i))
            else if(columnDataTypes(i)=="DateType")
              st.setDate((i+1).toInt,row.getDate(i))
            else if (columnDataTypes(i)=="DoubleType")
              st.setDouble((i+1).toInt, row.getDouble(i))
          }
          st.addBatch()
        }
        }
        st.executeBatch()
      })
      dbc.close()
    })

这给了我:org.apache.spark.SparkException:任务无法序列化错误

我可以参考实现的任何想法或资源。我知道这在Java中是可能的,但是我在Java和Scala中都没有做太多工作。

编辑:在foreachPartition中使用braodcast变量尝试仍然得到org.apache.spark.SparkException: Task not serializable

下面是完整的异常堆栈:


org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2343)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:957)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:956)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:956)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2735)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2735)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2735)
  at org.apache.spark.sql.Dataset$$anonfun$withNewRDDExecutionId$1.apply(Dataset.scala:3349)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3345)
  at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2734)
  ... 80 elided
Caused by: java.io.NotSerializableException: org.postgresql.jdbc.PgConnection
Serialization stack:
        - object not serializable (class: org.postgresql.jdbc.PgConnection, value: org.postgresql.jdbc.PgConnection@71c7a55b)
        - field (class: $iw, name: dbc, type: interface java.sql.Connection)
        - object (class $iw, $iw@22459ca5)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@788dd40c)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@31c725ed)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@4a367987)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@7cffd7ab)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@3c615880)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@289fa6c2)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@2a5a0934)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@4a04a12a)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@c5fe90a)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@58b67f02)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@243a4a22)
        - field (class: $line77.$read, name: $iw, type: class $iw)
        - object (class $line77.$read, $line77.$read@5f473976)
        - field (class: $iw, name: $line77$read, type: class $line77.$read)
        - object (class $iw, $iw@70fc6803)
        - field (class: $iw, name: $outer, type: class $iw)
        - object (class $iw, $iw@26818b0)
        - field (class: $anonfun$1, name: $outer, type: class $iw)
        - object (class $anonfun$1, <function1>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
  ... 98 more

更新2:根据@RamGhadiyaram的建议进行了修改,但现在面临新的NullPointerException。我不明白我要去哪里错了。当然,这必须是对此的简单解决方案。

下面是更新的代码:

val inputdatafiledfwindow = inputdatafiledf.select("*").withColumn("rank",row_number().over(windowSpec)).where("rank = 1").drop("rank")

    val updatequeryforInsertAndUpdate = "INSERT INTO " + schema + table_name + updatequery + " where " + schema + table_name + s".row_tmstmp < '2020-02-17 00:00:00' OR ${table_name}.row_tmstmp < ?"

    val columnNames: scala.Array[String] = inputdatafiledfwindow.columns
    val columnDataTypes: scala.Array[String] = inputdatafiledfwindow.schema.fields.map(x=>x.dataType).map(x=>x.toString)


    inputdatafiledfwindow.foreachPartition(partition  => {
      val columnNames_br = sc.broadcast(columnNames)
      val columnDataTypes_br = sc.broadcast(columnDataTypes)

      val dbc: Connection = DriverManager.getConnection(jdbcConnectionString)
      val st  = dbc.prepareStatement(updatequeryforInsertAndUpdate)
      partition.grouped(50).foreach(batch => {
        batch.foreach { row => {
          for (i<-0 to columnNames_br.value.length-1) {
            if (columnDataTypes_br.value(i) == "ShortType")
              st.setShort((i+1), row.getShort(i))
            else if(columnDataTypes_br.value(i)== "IntegerType")
              st.setInt((i+1),row.getInt(i))
            else if (columnDataTypes_br.value(i)=="StringType")
              st.setString((i+1),row.getString(i))
            else if(columnDataTypes_br.value(i)=="TimestampType")
              st.setTimestamp((i+1), row.getTimestamp(i))
            else if(columnDataTypes_br.value(i)=="DateType")
              st.setDate((i+1),row.getDate(i))
            else if (columnDataTypes_br.value(i)=="DoubleType")
              st.setDouble((i+1), row.getDouble(i))
          }
          st.addBatch()
        }
        }
        st.executeBatch()
      })
      dbc.close()
    })

和下面是新的异常堆栈:

20/03/25 11:12:49 WARN TaskSetManager: Lost task 0.0 in stage 19.0 (TID 176, dbslp1102.uhc.com, executor 4): java.lang.NullPointerException
        at $line89.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:87)
        at $line89.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:86)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

20/03/25 11:12:49 ERROR TaskSetManager: Task 0 in stage 19.0 failed 4 times; aborting job
20/03/25 11:12:49 WARN TaskSetManager: Lost task 11.2 in stage 19.0 (TID 210, dbslp1102.uhc.com, executor 5): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 4.3 in stage 19.0 (TID 204, dbslp1102.uhc.com, executor 2): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 7.1 in stage 19.0 (TID 201, dbslp1102.uhc.com, executor 4): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 2.2 in stage 19.0 (TID 205, dbslp1102.uhc.com, executor 2): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 12.2 in stage 19.0 (TID 206, dbslp1102.uhc.com, executor 2): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 10.2 in stage 19.0 (TID 207, dbslp1102.uhc.com, executor 5): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 3.2 in stage 19.0 (TID 209, dbslp1102.uhc.com, executor 5): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 13.0 in stage 19.0 (TID 208, dbslp1102.uhc.com, executor 4): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 8.3 in stage 19.0 (TID 202, dbslp1102.uhc.com, executor 4): TaskKilled (stage cancelled)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 4 times, most recent failure: Lost task 0.3 in stage 19.0 (TID 203, dbslp1102.uhc.com, executor 5): java.lang.NullPointerException
        at $anonfun$1.apply(<console>:87)
        at $anonfun$1.apply(<console>:86)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2341)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2341)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2341)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2828)
  at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2340)
  ... 86 elided
Caused by: java.lang.NullPointerException
  at $anonfun$1.apply(<console>:87)
  at $anonfun$1.apply(<console>:86)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:108)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
postgresql scala apache-spark jdbc prepared-statement
1个回答
1
投票

据我了解,您需要动态生成所有列,并以正确的数据类型插入到相应的数据列中。

针对现有数据框的

    val columnNames : Array[String] = inputdatafiledfwindow.columns

    val columnDataTypes : Array[String] = inputdatafiledfwindow.schema.fields
                            .map(x=>x.dataType)
                            .map(x=>x.toString)

现在您已经获得了列和相应的数据类型。

您可以通过动态检查数据类型并调用适当的psmt.setxxx方法来准备语句在循环中实现。并且数组索引是setXXX的参数索引。

在这种情况下,不需要spring jdbc模板,同样可以使用jdbc来实现。

UPDATE1:


您的列类型(即columnDataTypes)和列名数组(即columnNames)应使用广播变量以在可能的foreachpartition中使用它们错误的根本原因.. org.apache.spark.SparkException: Task not serializable error


更新2:引起原因:java.io.NotSerializableException:org.postgresql.jdbc.PgConnection一些您的连接存在此问题的地方

AFAIK,您要转换为可能来自其他对象的字符串的用户名和密码...,但是通常您的代码看起来还可以。重新检查。

根据Spark文档,您也可以这样声明网址以及用户名和密码。试试吧。

JDBC To Other Databases-要连接的JDBC URL。的特定于源的连接属性可以在URL中指定。例如jdbc:postgresql://localhost/test?user=fred&password=secret

更新3:空指针异常的原因很简单。如果您对null进行操作,则将导致null指针异常

例如:row.getString(8).replaceAll("\\000", "")如果row.getString(8)为空并且您正在应用replaceAll,则这是空指针异常。您必须检查row.getString(8)是否不为空,然后必须应用函数replaceAll。]

避免空指针的最佳方法是使用scala Option。

另一个观察结果是对每个循环使用scala,而不是Java传统循环。

注意:请分别询问每个要求。不要混合增加。它会混淆其他人。

© www.soinside.com 2019 - 2024. All rights reserved.