try catch块没有捕获异常

问题描述 投票:1回答:2

我将DStream保存到Cassandra。 Cassandra中有一列有map<text, text>数据类型。 Cassandra不支持Map中的null值,但可以在流中出现null值。

如果出现问题,我已经添加了try catch,但程序停止了,尽管如此,我在日志中看不到错误消息:

   try {
      cassandraStream.saveToCassandra("table", "keyspace")
    } catch {
      case e: Exception => log.error("Error in saving data in Cassandra" + e.getMessage, e)
    }

例外

Caused by: java.lang.NullPointerException: Map values cannot be null
    at com.datastax.driver.core.TypeCodec$AbstractMapCodec.serialize(TypeCodec.java:2026)
    at com.datastax.driver.core.TypeCodec$AbstractMapCodec.serialize(TypeCodec.java:1909)
    at com.datastax.driver.core.AbstractData.set(AbstractData.java:530)
    at com.datastax.driver.core.AbstractData.set(AbstractData.java:536)
    at com.datastax.driver.core.BoundStatement.set(BoundStatement.java:870)
    at com.datastax.spark.connector.writer.BoundStatementBuilder.com$datastax$spark$connector$writer$BoundStatementBuilder$$bindColumnUnset(BoundStatementBuilder.scala:73)
    at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun$6.apply(BoundStatementBuilder.scala:84)
    at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun$6.apply(BoundStatementBuilder.scala:84)
    at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun$bind$1.apply$mcVI$sp(BoundStatementBuilder.scala:106)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:101)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:233)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
    at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183)
    at com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1$$anonfun$apply$1.apply(DStreamFunctions.scala:54)
    at com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1$$anonfun$apply$1.apply(DStreamFunctions.scala:54)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    ... 3 more

我想知道为什么程序停止了,尽管try / catch块。为什么没有抓住异常?

scala apache-spark exception-handling try-catch spark-streaming
2个回答
1
投票

要理解失败的根源,你必须承认DStreamFunctions.saveToCassandra,与DStream输出操作一般,不是严格意义上的行为。在实践中it just invokes foreachRDD

dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write _))

which in turn

将函数应用于此DStream中的每个RDD。这是一个输出操作符,因此'this'DStream将被注册为输出流,因此具体化。

差异是微妙的,但很重要 - 操作已注册,但实际执行发生在不同的上下文中,在稍后的时间点。

这意味着在您调用saveToCassandra时没有捕获运行时故障。

正如已经指出的那样,如果直接应用于某个动作,tryTry将包含驱动程序异常。所以你要重新实现saveToCassandra

dstream.foreachRDD(rdd => try { 
  rdd.sparkContext.runJob(rdd, writer.write _) 
} catch {
  case e: Exception => log.error("Error in saving data in Cassandra" + e. getMessage, e)
})

尽管当前批次将完全或部分丢失,但流应该能够继续进行。

重要的是要注意,这与捕获原始异常不同,原始异常将在日志中被抛出,未被捕获和可见。要从源头捕获问题,你必须直接在编写器中应用try / catch块,当你执行没有控制权的代码时,这显然不是一个选项。

带走消息(已在此主题中说明) - 确保清理数据以避免已知的故障源。


-1
投票

问题是你没有发现你认为你做的例外。您拥有的代码将捕获驱动程序异常,事实上,像这样构造的代码将执行此操作。

但这并不意味着

该计划永远不会停止。

虽然驱动程序失败(这可能是致命的执行程序失败的结果)被包含,并且驱动程序可以正常退出,因此流已经消失。因此,您的代码退出,因为没有更多的流可以运行。

如果有问题的代码在您的控制之下,则应该将异常处理委托给该任务,但是如果是第三方代码,则没有这样的选项。

相反,您应该验证您的数据,并删除有问题的记录,然后再将这些记录传递给saveToCassandra

© www.soinside.com 2019 - 2024. All rights reserved.