我将DStream
保存到Cassandra。 Cassandra中有一列有map<text, text>
数据类型。 Cassandra不支持Map中的null
值,但可以在流中出现null值。
如果出现问题,我已经添加了try
catch
,但程序停止了,尽管如此,我在日志中看不到错误消息:
try {
cassandraStream.saveToCassandra("table", "keyspace")
} catch {
case e: Exception => log.error("Error in saving data in Cassandra" + e.getMessage, e)
}
例外
Caused by: java.lang.NullPointerException: Map values cannot be null
at com.datastax.driver.core.TypeCodec$AbstractMapCodec.serialize(TypeCodec.java:2026)
at com.datastax.driver.core.TypeCodec$AbstractMapCodec.serialize(TypeCodec.java:1909)
at com.datastax.driver.core.AbstractData.set(AbstractData.java:530)
at com.datastax.driver.core.AbstractData.set(AbstractData.java:536)
at com.datastax.driver.core.BoundStatement.set(BoundStatement.java:870)
at com.datastax.spark.connector.writer.BoundStatementBuilder.com$datastax$spark$connector$writer$BoundStatementBuilder$$bindColumnUnset(BoundStatementBuilder.scala:73)
at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun$6.apply(BoundStatementBuilder.scala:84)
at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun$6.apply(BoundStatementBuilder.scala:84)
at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun$bind$1.apply$mcVI$sp(BoundStatementBuilder.scala:106)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:101)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:233)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183)
at com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1$$anonfun$apply$1.apply(DStreamFunctions.scala:54)
at com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1$$anonfun$apply$1.apply(DStreamFunctions.scala:54)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
... 3 more
我想知道为什么程序停止了,尽管try / catch块。为什么没有抓住异常?
要理解失败的根源,你必须承认DStreamFunctions.saveToCassandra
,与DStream
输出操作一般,不是严格意义上的行为。在实践中it just invokes foreachRDD
:
dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write _))
将函数应用于此DStream中的每个RDD。这是一个输出操作符,因此'this'DStream将被注册为输出流,因此具体化。
差异是微妙的,但很重要 - 操作已注册,但实际执行发生在不同的上下文中,在稍后的时间点。
这意味着在您调用saveToCassandra
时没有捕获运行时故障。
正如已经指出的那样,如果直接应用于某个动作,try
或Try
将包含驱动程序异常。所以你要重新实现saveToCassandra
dstream.foreachRDD(rdd => try {
rdd.sparkContext.runJob(rdd, writer.write _)
} catch {
case e: Exception => log.error("Error in saving data in Cassandra" + e. getMessage, e)
})
尽管当前批次将完全或部分丢失,但流应该能够继续进行。
重要的是要注意,这与捕获原始异常不同,原始异常将在日志中被抛出,未被捕获和可见。要从源头捕获问题,你必须直接在编写器中应用try
/ catch
块,当你执行没有控制权的代码时,这显然不是一个选项。
带走消息(已在此主题中说明) - 确保清理数据以避免已知的故障源。
问题是你没有发现你认为你做的例外。您拥有的代码将捕获驱动程序异常,事实上,像这样构造的代码将执行此操作。
但这并不意味着
该计划永远不会停止。
虽然驱动程序失败(这可能是致命的执行程序失败的结果)被包含,并且驱动程序可以正常退出,因此流已经消失。因此,您的代码退出,因为没有更多的流可以运行。
如果有问题的代码在您的控制之下,则应该将异常处理委托给该任务,但是如果是第三方代码,则没有这样的选项。
相反,您应该验证您的数据,并删除有问题的记录,然后再将这些记录传递给saveToCassandra
。