我试图加载它与收集的数据类型两个属性为卡桑德拉表火花数据帧。
在进入的原料的文件,这些属性是文本/字符串。我用下面的代码转换的字符串类型分别列表,地图类型:
spark.udf.register("getLst", (input: String) => input.split(",").toList)
spark.udf.register("getMap", (input:String) => parse(input).values.asInstanceOf[Map[String, String]])
val ofr_data_final=spark.sql("""select
...
getLst(acct_nb_ls) as acct_nb_ls,
getMap(brw_eci_and_sts_mp) as brw_eci_and_sts_mp,
.....""")
火花数据帧的打印架构显示如下所示的那些两个属性:
|-- acct_nb_ls: array (nullable = true)
| |-- element: string (containsNull = true)
|-- brw_eci_and_sts_mp: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
在卡桑德拉,这两个属性的定义如下所示:
acct_nb_ls FROZEN<LIST<text>>,
brw_eci_and_sts_mp FROZEN<MAP<text, text>>,
这里是我的负荷声明:
ofr_data_final.rdd.saveToCassandra(Config.keySpace,offerTable, writeConf = WriteConf(ttl = TTLOption.perRow("ttl")))
然而,负载失败,下面的错误:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 140 in stage 24.0 failed 4 times, most recent failure: Lost task 140.3 in stage 24.0 (TID 1741, bdtcstr70n12.svr.us.jpmchase.net, executor 9): java.io.IOException: Failed to write statements to mars_offerdetails.offer_detail_2.
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:167)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:37)
at com.jpmc.mars.LoadOfferData$.delayedEndpoint$com$jpmc$mars$LoadOfferData$1(LoadOfferData.scala:246)
at com.jpmc.mars.LoadOfferData$delayedInit$body.apply(LoadOfferData.scala:22)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.jpmc.mars.LoadOfferData$.main(LoadOfferData.scala:22)
at com.jpmc.mars.LoadOfferData.main(LoadOfferData.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:782)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: Failed to write statements to mars_offerdetails.offer_detail_2.
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:167)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我怀疑这个问题可能是因为属性acct_nb_lst被推断为“阵”而不是“清单”,但我不知道如何使火花推断为“名单”而不是“阵”。在我的UDF,我曾提到的定义
input.split(",").toList
但它仍然是越来越推断为数组。
使用火花卡桑德拉连接器在批处理模式下加载集合数据类型的工作与使用rdd.saveToCassandra创纪录的水平TTL选项的预期。这个问题是与数据。数据太旧了,有它产生的负的TTL值,因此加载失败过去过期日期。
星火错误信息应加强暗示。