如何使用火花卡桑德拉连接器以批处理方式装入集合数据类型

问题描述 投票:0回答:1

我试图加载它与收集的数据类型两个属性为卡桑德拉表火花数据帧。

在进入的原料的文件,这些属性是文本/字符串。我用下面的代码转换的字符串类型分别列表,地图类型:

    spark.udf.register("getLst", (input: String) => input.split(",").toList)

    spark.udf.register("getMap", (input:String) => parse(input).values.asInstanceOf[Map[String, String]])

    val ofr_data_final=spark.sql("""select  
            ...
            getLst(acct_nb_ls) as acct_nb_ls, 
            getMap(brw_eci_and_sts_mp) as brw_eci_and_sts_mp, 
            .....""")

火花数据帧的打印架构显示如下所示的那些两个属性:

    |-- acct_nb_ls: array (nullable = true)
    |    |-- element: string (containsNull = true)
    |-- brw_eci_and_sts_mp: map (nullable = true)
    |    |-- key: string
    |    |-- value: string (valueContainsNull = true)

在卡桑德拉,这两个属性的定义如下所示:

    acct_nb_ls FROZEN<LIST<text>>,
    brw_eci_and_sts_mp FROZEN<MAP<text, text>>,

这里是我的负荷声明:

    ofr_data_final.rdd.saveToCassandra(Config.keySpace,offerTable, writeConf = WriteConf(ttl = TTLOption.perRow("ttl")))

然而,负载失败,下面的错误:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 140 in stage 24.0 failed 4 times, most recent failure: Lost task 140.3 in stage 24.0 (TID 1741, bdtcstr70n12.svr.us.jpmchase.net, executor 9): java.io.IOException: Failed to write statements to mars_offerdetails.offer_detail_2.
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:167)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:37)
    at com.jpmc.mars.LoadOfferData$.delayedEndpoint$com$jpmc$mars$LoadOfferData$1(LoadOfferData.scala:246)
    at com.jpmc.mars.LoadOfferData$delayedInit$body.apply(LoadOfferData.scala:22)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at com.jpmc.mars.LoadOfferData$.main(LoadOfferData.scala:22)
    at com.jpmc.mars.LoadOfferData.main(LoadOfferData.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:782)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: Failed to write statements to mars_offerdetails.offer_detail_2.
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:167)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我怀疑这个问题可能是因为属性acct_nb_lst被推断为“阵”而不是“清单”,但我不知道如何使火花推断为“名单”而不是“阵”。在我的UDF,我曾提到的定义

    input.split(",").toList

但它仍然是越来越推断为数组。

apache-spark collections spark-cassandra-connector
1个回答
0
投票

使用火花卡桑德拉连接器在批处理模式下加载集合数据类型的工作与使用rdd.saveToCassandra创纪录的水平TTL选项的预期。这个问题是与数据。数据太旧了,有它产生的负的TTL值,因此加载失败过去过期日期。

星火错误信息应加强暗示。

© www.soinside.com 2019 - 2024. All rights reserved.