如何在scala中使用spark cassandra连接器API

问题描述 投票:0回答:1

我以前的帖子:Reparing Prepared stmt warning

我无法解决它,只有一些建议,我尝试使用spark cassandra连接器来解决我的问题。但我完全混淆它在我的应用程序中的用法。我试着编写如下代码,但不确定如何使用API​​。

val conf = new SparkConf(true)
        .set("spark.cassandra.connection.host", "1.1.1.1")
        .set("spark.cassandra.auth.username", "auser")            
        .set("spark.cassandra.auth.password", "apass")
        .set("spark.cassandra.connection.port","9042")

      val sc=new SparkContext(conf)  

      val c = CassandraConnector(sc.getConf)
c.withSessionDo ( session => session.prepareStatement(session,insertQuery)

    val boundStatement = new BoundStatement(insertStatement)

    batch.add(boundStatement.bind(data.service_id, data.asset_id, data.summ_typ, data.summ_dt, data.trp_summ_id, data.asset_serial_no, data.avg_sp, data.c_dist, data.c_epa, data.c_gal, data.c_mil, data.device_id, data.device_serial_no, data.dist, data.en_dt, data.en_lat, data.en_long, data.epa, data.gal, data.h_dist, data.h_epa, data.h_gal, data.h_mil, data.id_tm, data.max_sp, data.mil, data.rec_crt_dt, data.st_lat, data.st_long, data.tr_dis, data.tr_dt, data.tr_dur, data.st_addr, data.en_addr))

)

   def prepareStatement(session: Session, query: String): PreparedStatement = {
    val cluster = session.clustername
    get(cluster, query.toString) match {
      case Some(stmt) => stmt
      case None =>
        synchronized {
          get(cluster, query.toString) match {
            case Some(stmt) => stmt
            case None =>
              val stmt = session.prepare(query)
              put(cluster, query.toString, stmt)
          }
        }
    }
  }


  -----------------------------------------------------------------------------------------OR

   val table1 = spark.read
                 .format("org.apache.spark.sql.cassandra")
                 .option( "spark.cassandra.auth.username","apoch_user")
                 .option("spark.cassandra.auth.password","Apoch#123")
                 .options(Map(
                      "table" -> "trip_summary_data",
                       "keyspace" -> "aphoc" ,
                      "cluster" -> "Cluster1"
                       ) ).load()


     def insert( data: TripHistoryData) {

    table1.createOrReplaceTempView("inputTable1");

val df1= spark.sql("select * from inputTable1 where service_id = ? and asset_id = ? and summ_typ = ? and summ_dt >= ? and summ_dt <= ?");
val df2=spark.sql("insert into inputTable1 values (data.service_id, data.asset_id, data.summ_typ, data.summ_dt, data.trp_summ_id, data.asset_serial_no, data.avg_sp, data.c_dist, data.c_epa, data.c_gal, data.c_mil, data.device_id, data.device_serial_no, data.dist, data.en_dt, data.en_lat, data.en_long, data.epa, data.gal, data.h_dist, data.h_epa, data.h_gal, data.h_mil, data.id_tm, data.max_sp, data.mil, data.rec_crt_dt, data.st_lat, data.st_long, data.tr_dis, data.tr_dt, data.tr_dur, data.st_addr, data.en_addr))


  }
apache-spark cassandra apache-spark-sql spark-cassandra-connector
1个回答
1
投票

您需要专注于如何在Spark应用程序中处理数据,而不是如何读取或写入数据(当然,这很重要,但只有当您遇到性能问题时)。

如果您正在使用Spark,则需要在处理RDD或DataFrame中的数据时使用Spark术语进行思考。在这种情况下,您需要使用这些结构(使用DataFrames):

val df = spark
  .read
  .cassandraFormat("words", "test")
  .load()
val newDf = df.sql(...) // some operation on source data
newDF.write
  .cassandraFormat("words_copy", "test")
  .save()

并避免使用直接session.prepare / session.executecluster.connect等。 - Spark连接器将做准备,并在引擎盖下进行其他优化。

© www.soinside.com 2019 - 2024. All rights reserved.