我遇到了Cassandra的DataStax Spark Connector问题。我的应用程序包含一个Spark操作,它在Cassandra数据库上执行许多单记录查询;其中一些查询将成功,但在某些时候,其中一个查询将失败,并带有NoHostAvailableException
和消息All host(s) tried for query failed (no host was tried)
。
2018-06-26 12:32:09 ERROR Executor:91 - Exception in task 0.3 in stage 0.0 (TID 6)
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:37)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:40)
at com.sun.proxy.$Proxy15.execute(Unknown Source)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:40)
at com.sun.proxy.$Proxy16.execute(Unknown Source)
at [line that contains the session.execute() call]
[...]
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:211)
at com.datastax.driver.core.RequestHandler.access$1000(RequestHandler.java:46)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:275)
at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:115)
at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:95)
at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)
... 32 more
为了分析这个问题,我成功地在一个简单的环境中重现了它:
下面是我可以用来重现问题的最小代码。
val pkColumn1Value = 1L
val pkColumn2Values: Dataset[Long] = sparkSession.createDataset(1L to 19 by 2)
val connector: CassandraConnector = [...]
val results: Dataset[SimpleValue] = pkColumn2Values.mapPartitions { iterator =>
connector.withSessionDo { session =>
val clusteringKeyValues = Seq(...)
val preparedStatement = session.prepare("select * from simple_values where pk_column_1_value = ? and pk_column_2_value = ? and clustering_key_value = ?")
iterator.flatMap { pkColumn2Value =>
val boundStatements = clusteringKeyValues.iterator.map(clusteringKeyValue =>
preparedStatement.bind(
pkColumn1Value.asInstanceOf[AnyRef]
, pkColumn2Value.asInstanceOf[AnyRef]
, clusteringKeyValue.asInstanceOf[AnyRef]
)
)
boundStatements.map { boundStatement =>
val record = try {
session.execute(boundStatement).one()
} catch {
case noHostAvailableException: NoHostAvailableException =>
log.error(s"Encountered NHAE, getErrors: ${noHostAvailableException.getErrors}")
throw noHostAvailableException
case exception =>
throw exception
}
log.error(s"Retrieved record $record")
// Sleep to simulate an operation being performed on the value.
Thread.sleep(100)
record
}
}
}
}
log.error(s"Perfunctory log statement that triggers an action: ${results.collect().last}")
Dataset#mapPartitions()
只能为每个分区准备一次select语句。当我吞下自己的骄傲并使用Dataset#map()
或Dataset#flatMap()
代替时,问题就消失了,但我想使用Dataset#mapPartitions()
为每个数据集分区只准备一次查询的(表面)性能优势。NoHostAvailableException
似乎会发生一段固定的时间。一些调查证实,这段时间等于连接器属性spark.cassandra.connection.keep_alive_ms
的值。将此属性设置为一个非常高的值表面上可以解决问题,但这似乎是一个肮脏的解决方案,而不是一个明智的解决方案。在用于连接器的this GitHub issue中,评论者pkolaczk提到了一个潜在的问题,可能导致连接器在与Cassandra的初始连接中成功,并在尝试以后建立其他连接时失败。这听起来很有希望,因为它与上述点匹配(这表明问题只会在原始连接关闭后才会发生,如果为数据集中的每个元素单独重新建立连接,则不会发生这种情况);但是,我一直无法找到任何迹象表明我错误配置了IP地址或任何其他可能导致这种现象的原因(或者甚至确认这种现象实际上是导致问题的原因)。
NoHostAvailableException
s总是先于其他错误。我已多次检查我的日志,但找不到任何其他错误消息或堆栈跟踪。NoHostAvailableException#getErrors
来获得更详细的问题解释,但这个方法总是为我返回一个空映射。mapPartitions
而不是在使用map
时发生的事实)。spark.cassandra.connection.local_dc
最初未设置。将此属性设置为适当的数据中心名称对此问题没有明显影响。spark.cassandra.connection.timeout_ms
和spark.cassandra.read.timeout_ms
设置为可笑的高值;这对这个问题没有明显的影响。任何迹象表明导致这些错误的原因或如何解决问题都将受到高度赞赏。
我将这个问题交叉发布到连接器的谷歌用户组(https://groups.google.com/a/lists.datastax.com/d/msg/spark-connector-user/oWrP7qeHJ7k/pmgnF_kbBwAJ),其中一个贡献者确认没有理由不为spark.cassandra.connection.keep_alive_ms
提供高价值。我已经把这个值提高到了一个可以合理地确定没有操作可以通过它的点,并且从那时起就没有问题了。