是否有办法找出Cassandra中的SELECT语句使用了哪个节点?

问题描述 投票:0回答:2

我为spark-cassandra-connector编写了一个自定义LoadBalancerPolicy,现在我想确保它确实有效!

我有一个具有3个节点的Cassandra集群和一个复制因子为2的键空间,因此当我们要检索记录时,cassandra上将只有两个节点来保存数据。

事实是,我想确保spark-cassandra-connector(使用我的负载均衡器策略)仍可识别令牌,并将为每个“ SELECT”语句选择正确的节点作为协调器。

现在,我正在考虑是否可以在每个节点的SELECT语句上编写触发器,如果​​该节点不保存数据,则触发器将创建一个日志,并且我意识到不执行load-balancer-policy好好工作。我们如何在Cassandra中的SELECT上编写触发器?有没有更好的方法可以做到这一点?

我已经检查了用于创建触发器的文档,但这些文档太有限:

Official documentation

Documentation at DataStax

Example implementation in official repo

cassandra database-trigger spark-cassandra-connector
2个回答
0
投票

您可以从程序端进行操作,如果您的绑定语句为get routing key(必须使用准备好的语句),请找到replicas for it via Metadata class,然后比较是否可以从this host is in the ExecutionInfo中获得ResultSet


0
投票

根据亚历克斯所说,我们可以做到如下:

创建SparkSession之后,我们应该建立一个连接器:

import com.datastax.spark.connector.cql.CassandraConnector
val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf)

现在我们可以定义一个prepareStatement并完成其余工作:

connector.withSessionDo(session => {

    val selectQuery = "select * from test where id=?"
    val prepareStatement = session.prepare(selectQuery)
    val protocolVersion = session.getCluster.getConfiguration.getProtocolOptions.getProtocolVersion
    // We have to explicitly bind the all of parameters that partition key is based on them, otherwise the routingKey will be null.
    val boundStatement = prepareStatement.bind(s"$id")
    val routingKey = boundStatement.getRoutingKey(protocolVersion, null)
    // We can get tha all of nodes that contains the row
    val replicas = session.getCluster.getMetadata.getReplicas("test", routingKey)
    val resultSet = session.execute(boundStatement)

    // We can get the node which gave us the row
    val host = resultSet.getExecutionInfo.getQueriedHost

    // Final step is to check whether the replicas contains the host or not!!!
    if (replicas.contains(host)) println("It works!")
  })

重要的是,我们必须显式绑定分区键基于它们的所有参数(即,我们不能在SELECT语句中将它们设置为har-codded,否则,routingKey将为空。

© www.soinside.com 2019 - 2024. All rights reserved.