无法通过spark scala程序验证cassandra集群

问题描述 投票:0回答:3

请建议我解决以下问题,或建议我采用任何不同的方法来实现我的问题陈述。我从某个地方获取数据并将其插入每天的cassandra然后我需要从cassandra中检索整个星期的数据并进行一些处理并将结果插回到cassandra。

我有很多记录,每条记录执行以下大部分操作。根据我之前发布的Repreparing preparedstatement warning建议,为了避免重新编写准备好的语句,试图保留查询字符串vs预备语句的映射。

我尝试编写以下spark scala程序,我从cqlsh验证了cassandra主机的详细信息,我能够连接到它。但是当我尝试时通过程序,我收到错误。

class StatementCache {
  val acluster = CassandraUtils.initialize(nodes,user,pass, cassport,racdc)

  val session = acluster.connect("keyspacename");

      val statementCache: ConcurrentHashMap[String,PreparedStatement] = new ConcurrentHashMap


      def getStatement(cql : String): BoundStatement = {
    var ps : PreparedStatement = statementCache.get(cql);
     if (ps == null) {
                ps = session.prepare(cql);
                statementCache.put(cql, ps);
            }
            return ps.bind();
        }
    }


object CassandraUtils {
  println("##########entered cassandrutils")
   //val st=new STMT();
 private val psCache  : StatementCache = new StatementCache();
 val selectQuery = "select * from k1.table1 where s_id = ? and a_id = ? and summ_typ = ? and summ_dt >= ? and summ_dt <= ?;"
  val selectTripQuery = "select * from k1.tale1 where s_id = ? and a_id = ? and summ_typ = ? and summ_dt = ? and t_summ_id = ?;"

  val insertQuery = "insert into k1.table1 (s_id, a_id, summ_typ, summ_dt, t_summ_id, a_s_no, avg_sp, c_dist, c_epa, c_gal, c_mil, d_id, d_s_no, dist, en_dt, en_lat, en_long, epa, gal, h_dist, h_epa,h_gal, h_mil, id_tm, max_sp, mil, rec_crt_dt, st_lat, st_long, tr_dis, tr_dt, tr_dur,st_addr,en_addr) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,?);"
  val updateQuery = "update k1.table1 set tr_dur=?,id_tm=?,max_sp=?,c_dist=?,h_dist=?,dist=?,c_gal=?,c_mil=?,h_gal=?,h_mil=?,c_epa=?,h_epa=?,epa=?,gal=?,rec_crt_dt=?,mil=?,avg_sp=?,tr_dis=?,en_lat=?,en_long=? where s_id= ? and a_id= ? and summ_typ= ? and  summ_dt= ? and t_summ_id=?; "

  def insert(session: Session, data: TripHistoryData, batch: BatchStatement) {
   batch.add(psCache.getStatement(insertQuery));
  }

  def update(session: Session, data: TripHistoryData, batch: BatchStatement) {
    batch.add(psCache.getStatement(updateQuery));
    }

     def initialize(clusterNodes: String, uid: String, pwd: String, port: Int, racdc:String): Cluster = {

    val builder = Cluster.builder().addContactPoints(InetAddress.getByName(clusterNodes))
      .withRetryPolicy(DefaultRetryPolicy.INSTANCE)
      .withLoadBalancingPolicy(
        new TokenAwarePolicy(
          DCAwareRoundRobinPolicy.builder() //You can directly use the DCaware without TokenAware as well
            .withLocalDc(racdc) //This is case sensitive as defined in rac-dc properties file
            //.withUsedHostsPerRemoteDc(2) //Try at most 2 remote DC nodes in case all local nodes are dead in the current DC
            //.allowRemoteDCsForLocalConsistencyLevel()
            .build()))

    if (StringUtils.isNotEmpty(uid)) {
      builder.withCredentials(uid, pwd)
    }

    val cluster: Cluster = builder.build()
    cluster
  }
}

-----------------------------------------------------------------------------------------------------------------

我收到以下错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.ExceptionInInitializerError
    at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1.apply(ProcessMPacket.scala:91)
    at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1.apply(ProcessMPacket.scala:45)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.driver.core.exceptions.AuthenticationException: Authentication error on host hostname1: Host hostname1 requires authentication, but no authenticator found in Cluster configuration
    at com.datastax.driver.core.AuthProvider$1.newAuthenticator(AuthProvider.java:40)
    at com.datastax.driver.core.Connection$5.apply(Connection.java:261)
    at com.datastax.driver.core.Connection$5.apply(Connection.java:243)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:906)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$1$1.run(Futures.java:635)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.MoreExecutors$DirectExecutorService.execute(MoreExecutors.java:299)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$1.run(Futures.java:632)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:457)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.ExecutionList.execute(ExecutionList.java:145)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:185)
    at com.datastax.driver.core.Connection$Future.onSet(Connection.java:1288)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1070)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:993)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    ... 1 more
scala apache-spark cassandra cassandra-3.0 spark-cassandra-connector
3个回答
0
投票

您需要在集群实例中提供身份验证.withCredentials(username.trim(),password.trim())

或者您需要通过在cassandra.yaml中将身份验证密钥值更改为AllowAllAuthenticator来禁用cassandra级别的身份验证。

注意:yaml中的更改需要cassandra重新启动


0
投票

我解决了这个问题。将下面的行放在getStatement方法中而不是外部方法中。

val acluster = CassandraUtils.initialize(nodes,user,pass, cassport,racdc)

val session = acluster.connect("keyspacename");

0
投票

您的问题是您正在尝试对连接执行“手动”管理 - 这不适用于Spark - Cluster / Session实例应该发送给执行程序,但它将无法正确执行,因为这些实例是在驱动程序中创建你当然可以使用foreachPartition中所描述的做“this question”的“典型”模式。

使用Spark的Cassandra的最佳方法是使用Cassandra Spark Connector - 它会自动在节点之间传播负载,并执行正确的数据插入和更新。在这种情况下,您可以配置连接参数,包括通过Spark属性进行身份验证(spark.cassandra.auth.usernamespark.cassandra.auth.password)。有关连接的更多信息是in the documentation

© www.soinside.com 2019 - 2024. All rights reserved.