多次使用预准备语句,警告Cassandra查询降低性能

问题描述 投票:0回答:2

我从某个地方获取数据并将其插入cassandra每日基础然后我需要从cassandra检索整个星期的数据并进行一些处理并将结果插回到cassandra

我有很多记录,每条记录执行以下大部分操作。

要做到这一点,我已经编写了一个低于其工作正常的程序,但我得到警告,根据API文档不应该多次使用prepare statement其降低性能。

请告诉我如何避免这种情况以改善性能或建议我在scala中实现此目的的任何替代方法。

这是我的代码的一部分:

object CassandraUtils {
  println("##########entered cassandrutils")

  val selectQuery = "select * from k1.table1 where s_id = ? and a_id = ? and summ_typ = ? and summ_dt >= ? and summ_dt <= ?;"
  val selectTripQuery = "select * from k1.tale1 where s_id = ? and a_id = ? and summ_typ = ? and summ_dt = ? and t_summ_id = ?;"

  val insertQuery = "insert into k1.table1 (s_id, a_id, summ_typ, summ_dt, t_summ_id, a_s_no, avg_sp, c_dist, c_epa, c_gal, c_mil, d_id, d_s_no, dist, en_dt, en_lat, en_long, epa, gal, h_dist, h_epa,h_gal, h_mil, id_tm, max_sp, mil, rec_crt_dt, st_lat, st_long, tr_dis, tr_dt, tr_dur,st_addr,en_addr) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,?);"
  val updateQuery = "update k1.table1 set tr_dur=?,id_tm=?,max_sp=?,c_dist=?,h_dist=?,dist=?,c_gal=?,c_mil=?,h_gal=?,h_mil=?,c_epa=?,h_epa=?,epa=?,gal=?,rec_crt_dt=?,mil=?,avg_sp=?,tr_dis=?,en_lat=?,en_long=? where s_id= ? and a_id= ? and summ_typ= ? and  summ_dt= ? and t_summ_id=?; "

  val dashboardSelectQuery: String = "select * from k1.table2 where s_id = ? and a_id = ? and hlth_typ= ? and hlth_s_typ= ?;"
  val insertDashBoardQuery = "insert into k1.table2 (s_id, a_id, hlth_typ, hlth_s_typ, dsh_nval_01, rec_crt_dt, lst_rfr_dt, a_s_no) values (? ,?, ?, ?, ?, ?, ?, ?);"
  val updateDashBoardQuery = "update k1.table2 set dsh_nval_01= ?, lst_rfr_dt= ? where s_id= ? and a_id= ? and hlth_typ= ? and hlth_s_typ= ?;"

  val dInfoSelectQuery = "select d_s_no,d_type,a_id,d_id,s_id from k2.table3 where d_s_no = ?"

  def insert(session: Session, data: THData, batch: BatchStatement) {
    val insertStatement = session.prepare(insertQuery)
    //insertStatement.setConsistencyLevel(ConsistencyLevel.QUORUM)
    //println("data.st_addr,data.en_addr: ------------------->>>>>>  " + data.st_addr, data.en_addr)
    val boundStatement = new BoundStatement(insertStatement)
    //session.execute(boundStatement.bind( data.s_id, data.a_id, data.summ_typ, data.summ_dt, data.t_summ_id, data.a_s_no, data.avg_sp, data.c_dist, data.c_epa, data.c_gal, data.c_mil, data.d_id, data.d_s_no, data.dist, data.en_dt, data.en_lat, data.en_long, data.epa, data.gal, data.h_dist, data.h_epa,data.h_gal, data.h_mil, data.id_tm, data.max_sp, data.mil, data.rec_crt_dt, data.st_lat, data.st_long, data.tr_dis, data.tr_dt, data.tr_dur,data.st_addr,data.en_addr))
    batch.add(boundStatement.bind(data.s_id, data.a_id, data.summ_typ, data.summ_dt, data.t_summ_id, data.a_s_no, data.avg_sp, data.c_dist, data.c_epa, data.c_gal, data.c_mil, data.d_id, data.d_s_no, data.dist, data.en_dt, data.en_lat, data.en_long, data.epa, data.gal, data.h_dist, data.h_epa, data.h_gal, data.h_mil, data.id_tm, data.max_sp, data.mil, data.rec_crt_dt, data.st_lat, data.st_long, data.tr_dis, data.tr_dt, data.tr_dur, data.st_addr, data.en_addr))
  }

  def update(session: Session, data: THData, batch: BatchStatement) {
    val updateStatement = session.prepare(updateQuery)
    //insertStatement.setConsistencyLevel(ConsistencyLevel.QUORUM)
    val boundStatement = new BoundStatement(updateStatement)
    //session.execute(boundStatement.bind( data.tr_dur, data.id_tm, data.max_sp, data.c_dist, data.h_dist, data.dist, data.c_gal, data.c_mil, data.h_gal, data.h_mil, data.c_epa, data.h_epa, data.epa, data.gal, data.rec_crt_dt, data.mil, data.avg_sp, data.tr_dis,data.en_lat, data.en_long, data.s_id,data.a_id, data.summ_typ, data.summ_dt, data.t_summ_id ))
    batch.add(boundStatement.bind(data.tr_dur, data.id_tm, data.max_sp, data.c_dist, data.h_dist, data.dist, data.c_gal, data.c_mil, data.h_gal, data.h_mil, data.c_epa, data.h_epa, data.epa, data.gal, data.rec_crt_dt, data.mil, data.avg_sp, data.tr_dis, data.en_lat, data.en_long, data.s_id, data.a_id, data.summ_typ, data.summ_dt, data.t_summ_id))
  }

  def getQueryData(session: Session, im: String): (Long, String, String, String) = {
    //println("query---->>>> :" + dInfoSelectQuery)
    val selectStatement = session.prepare(dInfoSelectQuery)
    val boundStatement = new BoundStatement(selectStatement)
    val result: ResultSet = session.execute(boundStatement.bind(im))
    val row = result.one()
    (row.getLong("s_id"), row.getString("a_id"), row.getString("d_id"), row.getString("d_s_no"))
  }

  def getDashBoardData(session: Session, Data: THData): AssetDashboardData = {
    //println("query---->>>> :" + dashboardSelectQuery)
    val selectStatement = session.prepare(dashboardSelectQuery)
    val boundStatement = new BoundStatement(selectStatement)
    val result: ResultSet = session.execute(boundStatement.bind(Data.s_id, Data.a_id, "odometer", "calculated"))
    var assetDashboardData: AssetDashboardData = null
    val row = result.one()
    if (row != null) {
     //doing some processing
    }
    assetDashboardData
  }

  def dashBoardInsert(session: Session, data: THData, batch: BatchStatement) {
    val insertStatement = session.prepare(insertDashBoardQuery)
    //insertStatement.setConsistencyLevel(ConsistencyLevel.QUORUM)
    val boundStatement = new BoundStatement(insertStatement)
    batch.add(boundStatement.bind(data.s_id, data.a_id, "odometer", "calculated", data.odometer, new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis()), data.a_s_no))
  }

  def dashBoardUpdate(session: Session, data: THData, batch: BatchStatement) {
    val updateStatement = session.prepare(updateDashBoardQuery)
    //insertStatement.setConsistencyLevel(ConsistencyLevel.QUORUM)
    val boundStatement = new BoundStatement(updateStatement)
    batch.add(boundStatement.bind(data.odometer, new Date(System.currentTimeMillis()), data.s_id, data.a_id, "odometer", "calculated"))
  }

.................

scala apache-spark cassandra cassandra-3.0 spark-cassandra-connector
2个回答
1
投票

调用准备每次都不是一个好主意..为了避免你可以简单地保留查询字符串与准备好的语句的映射..你可以在启动时填充缓存因此准备只会被调用一次...现在在你的Cassandrautil方法中您将从map获取预准备语句并创建绑定语句并执行它。


0
投票

Datastax文档已经说明了这一点

您应该只准备一次,并在您的应用程序中缓存PreparedStatement(它是线程安全的)。如果使用相同的查询字符串多次调用prepare,则驱动程序将记录警告。

如果只执行一次查询,则预准备语句效率低,因为它需要两次往返。请考虑一个简单的陈述。

Java Driver for Apache Cassandra 3.1 (Earlier version)你可以检查具体的版本。

兑现准备好的声明是您的申请建议。因为ConcurrentHashMap是一个很好的替代品,并继续使用它(PreparedStatement),如文档中所述它是THREAD SAFE。我希望scala也提供相同的实现。

© www.soinside.com 2019 - 2024. All rights reserved.