从Spark写入Cassandra时为空指针异常

问题描述 投票:0回答:1

我正在使用spark-cassandra-connector-2.4.0-s_2.11将数据从spark写入Databricks集群上的Cassandra。

我在将数据从Spark写入Cassandra时遇到java.lang.NullPointerException。很少有记录,这可以正常工作。

但是在尝试加载〜1.5亿]条记录时出现问题。

有人可以帮助我找到根本原因吗?

这是代码段:

val paymentExtractCsvDF = spark
                          .read
                          .format("csv")
                          .option("header", "true")
                          .load(/home/otl/extract/csvout/Payment)

    paymentExtractCsvDF.printSchema()

root
 |-- BAN: string (nullable = true)
 |-- ENT_SEQ_NO: string (nullable = true)
 |-- PYM_METHOD: string (nullable = true)

case class Payment(account_number: String, entity_sequence_number: String, payment_type: String)
val paymentResultDf = paymentExtractCsvDF.map(row => Payment(row.getAs("BAN"),
        row.getAs("ENT_SEQ_NO"),
        row.getAs("PYM_METHOD"))).toDF()

var paymentResultFilterDf = paymentResultDf
                            .filter($"account_number".isNotNull || $"account_number" != "")
                            .filter($"entity_sequence_number".isNotNull || $"entity_sequence_number" != "")

paymentResultFilterDf
  .write
  .format("org.apache.spark.sql.cassandra")
  .mode("append")
  .options(Map( "table" -> "cassandratable", "keyspace" -> "cassandrakeyspace"))
  .save()

这里是我得到的例外:

Failed to write statements to cassandrakeyspace.cassandratable. The
latest exception was
  An unexpected error occurred server side on /10.18.15.198:9042: java.lang.NullPointerException

Please check the executor logs for more exceptions and information

    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1$$anonfun$apply$3.apply(TableWriter.scala:243)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1$$anonfun$apply$3.apply(TableWriter.scala:241)
    at scala.Option.map(Option.scala:146)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:241)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
    at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
19/11/22 01:12:17 INFO CoarseGrainedExecutorBackend: Got assigned task 1095
19/11/22 01:12:17 INFO Executor: Running task 39.1 in stage 21.0 (TID 1095)
19/11/22 01:12:17 INFO ShuffleBlockFetcherIterator: Getting 77 non-empty blocks including 10 local blocks and 67 remote blocks
19/11/22 01:12:17 INFO ShuffleBlockFetcherIterator: Started 7 remote fetches in 3 ms
19/11/22 01:12:17 INFO ShuffleBlockFetcherIterator: Getting 64 non-empty blocks including 8 local blocks and 56 remote blocks
19/11/22 01:12:17 INFO ShuffleBlockFetcherIterator: Started 7 remote fetches in 1 ms
19/11/22 01:12:17 ERROR Executor: Exception in task 7.0 in stage 21.0 (TID 1012)

我正在使用spark-cassandra-connector-2.4.0-s_2.11将数据从spark写入Databricks集群上的Cassandra。我在将数据从Spark写入Cassandra时遇到java.lang.NullPointerException。 ...

apache-spark datastax-enterprise cassandra-3.0 spark-cassandra-connector azure-databricks
1个回答
0
投票

似乎您的数据框包含具有空值的关键字段。问题可能出在您的过滤条件上。我认为您想做这样的事情:

© www.soinside.com 2019 - 2024. All rights reserved.