使用Spark Cassandra Connector的NoHostAvailableException(没有尝试过主机)

问题描述 投票:1回答:1

我遇到了Cassandra的DataStax Spark Connector问题。我的应用程序包含一个Spark操作,它在Cassandra数据库上执行许多单记录查询;其中一些查询将成功,但在某些时候,其中一个查询将失败,并带有NoHostAvailableException和消息All host(s) tried for query failed (no host was tried)

堆栈跟踪

2018-06-26 12:32:09 ERROR Executor:91 - Exception in task 0.3 in stage 0.0 (TID 6)
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
    at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
    at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:37)
    at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
    at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:40)
    at com.sun.proxy.$Proxy15.execute(Unknown Source)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:40)
    at com.sun.proxy.$Proxy16.execute(Unknown Source)
    at [line that contains the session.execute() call]
    [...]
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
    at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:211)
    at com.datastax.driver.core.RequestHandler.access$1000(RequestHandler.java:46)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:275)
    at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:115)
    at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:95)
    at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)
    ... 32 more

为了分析这个问题,我成功地在一个简单的环境中重现了它:

  • 一台运行Cassandra,Spark大师和火花工人的机器
  • 一个简单的表,只包含100条记录(10个分区,每个分区有10条记录)

下面是我可以用来重现问题的最小代码。

val pkColumn1Value = 1L
val pkColumn2Values: Dataset[Long] = sparkSession.createDataset(1L to 19 by 2)
val connector: CassandraConnector = [...]

val results: Dataset[SimpleValue] = pkColumn2Values.mapPartitions { iterator =>
    connector.withSessionDo { session =>
        val clusteringKeyValues = Seq(...)

        val preparedStatement = session.prepare("select * from simple_values where pk_column_1_value = ? and pk_column_2_value = ? and clustering_key_value = ?")

        iterator.flatMap { pkColumn2Value =>
            val boundStatements = clusteringKeyValues.iterator.map(clusteringKeyValue =>
                preparedStatement.bind(
                    pkColumn1Value.asInstanceOf[AnyRef]
                    , pkColumn2Value.asInstanceOf[AnyRef]
                    , clusteringKeyValue.asInstanceOf[AnyRef]
                )
            )

            boundStatements.map { boundStatement =>
                val record = try {
                    session.execute(boundStatement).one()
                } catch {
                    case noHostAvailableException: NoHostAvailableException =>
                        log.error(s"Encountered NHAE, getErrors: ${noHostAvailableException.getErrors}")
                        throw noHostAvailableException
                    case exception =>
                        throw exception
                }

                log.error(s"Retrieved record $record")
                // Sleep to simulate an operation being performed on the value.
                Thread.sleep(100)

                record
            }
        }
    }
}

log.error(s"Perfunctory log statement that triggers an action: ${results.collect().last}")

我注意到一些有趣的事情

  • 我使用Dataset#mapPartitions()只能为每个分区准备一次select语句。当我吞下自己的骄傲并使用Dataset#map()Dataset#flatMap()代替时,问题就消失了,但我想使用Dataset#mapPartitions()为每个数据集分区只准备一次查询的(表面)性能优势。
  • 执行第一个查询后,NoHostAvailableException似乎会发生一段固定的时间。一些调查证实,这段时间等于连接器属性spark.cassandra.connection.keep_alive_ms的值。将此属性设置为一个非常高的值表面上可以解决问题,但这似乎是一个肮脏的解决方案,而不是一个明智的解决方案。

在用于连接器的this GitHub issue中,评论者pkolaczk提到了一个潜在的问题,可能导致连接器在与Cassandra的初始连接中成功,并在尝试以后建立其他连接时失败。这听起来很有希望,因为它与上述点匹配(这表明问题只会在原始连接关闭后才会发生,如果为数据集中的每个元素单独重新建立连接,则不会发生这种情况);但是,我一直无法找到任何迹象表明我错误配置了IP地址或任何其他可能导致这种现象的原因(或者甚至确认这种现象实际上是导致问题的原因)。

我检查和/或尝试过的一些事情

  • 多个在线消息来源表明,NoHostAvailableExceptions总是先于其他错误。我已多次检查我的日志,但找不到任何其他错误消息或堆栈跟踪。
  • 另一个StackOverflow问题的答案建议调用NoHostAvailableException#getErrors来获得更详细的问题解释,但这个方法总是为我返回一个空映射。
  • 当我使用RDD而不是数据集时,问题仍然存在(包括它仅在使用mapPartitions而不是在使用map时发生的事实)。
  • 连接器属性spark.cassandra.connection.local_dc最初未设置。将此属性设置为适当的数据中心名称对此问题没有明显影响。
  • 我尝试将连接器属性spark.cassandra.connection.timeout_msspark.cassandra.read.timeout_ms设置为可笑的高值;这对这个问题没有明显的影响。

一些版本号

  • Spark:用2.1.1和2.3.0重现了这个问题
  • 卡桑德拉:3.11
  • 连接器:用2.0.3和2.3.0重现了这个问题
  • 比例:2.11

任何迹象表明导致这些错误的原因或如何解决问题都将受到高度赞赏。

apache-spark cassandra spark-cassandra-connector
1个回答
0
投票

我将这个问题交叉发布到连接器的谷歌用户组(https://groups.google.com/a/lists.datastax.com/d/msg/spark-connector-user/oWrP7qeHJ7k/pmgnF_kbBwAJ),其中一个贡献者确认没有理由不为spark.cassandra.connection.keep_alive_ms提供高价值。我已经把这个值提高到了一个可以合理地确定没有操作可以通过它的点,并且从那时起就没有问题了。

© www.soinside.com 2019 - 2024. All rights reserved.