如何在Spark中将Cassandra表数据的顺序数值处理转换为并行?

问题描述 投票:1回答:1

我们正在使用spark cassandra连接器对来自Cassandra表的数据进行一些数学建模,并且执行当前是顺序的以获得输出。如何将其并行化以加快执行速度?

我是Spark的新手,我尝试过一些东西,但是我无法理解如何在map,groupby,reduceby函数中使用表格数据。如果有人可以帮助解释(使用一些代码片段)如何parrellize表格数据,这将是非常有帮助的。

import org.apache.spark.sql.{Row, SparkSession}
import com.datastax.spark.connector._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf



class SparkExample(sparkSession: SparkSession, pathToCsv: String) {
  private val sparkContext = sparkSession.sparkContext
  sparkSession.stop()
  val conf = new SparkConf(true)
  .set("spark.cassandra.connection.host","127.0.0.1")                           
  .setAppName("cassandra").setMaster("local[*]")
  val sc = new SparkContext(conf)


 def testExample(): Unit = {




val KNMI_rdd = sc.cassandraTable ("dbks1","knmi_w")


val Table_count = KNMI_rdd.count()
val KNMI_idx = KNMI_rdd.zipWithIndex
val idx_key = KNMI_idx.map{case (k,v) => (v,k)}

var i = 0
var n : Int = Table_count.toInt


println(Table_count)

for ( i  <- 1 to n if i < n) {
  println(i)


  val Row = idx_key.lookup(i)

  println(Row)


  val firstRow = Row(0)



  val yyyy_var = firstRow.get[Int]("yyyy")
  val mm_var = firstRow.get[Double]("mm")
  val dd_var = firstRow.get[Double]("dd")
  val dr_var = firstRow.get[Double]("dr")
  val tg_var = firstRow.get[Double]("tg")
  val ug_var = firstRow.get[Double]("ug")
  val loc_var = firstRow.get[String]("loc")



  val pred_factor = (((0.15461 * tg_var) + (0.8954 * ug_var)) / ((0.0000451 * dr_var) + 0.0004487))




  println(yyyy_var,mm_var,dd_var,loc_var)
  println(pred_factor)

 }

 }
}

  //test data

// loc | yyyy | mm | dd | dr  | tg  | ug
//-----+------+----+----+-----+-----+----
// AMS | 2019 |  1 |  1 |  35 |   5 | 84
// AMS | 2019 |  1 |  2 |  76 |  34 | 74
// AMS | 2019 |  1 |  3 |  46 |  33 | 85
// AMS | 2019 |  1 |  4 |  35 |   1 | 84
// AMS | 2019 |  1 |  5 |  29 |   0 | 93
// AMS | 2019 |  1 |  6 |  32 |  25 | 89
// AMS | 2019 |  1 |  7 |  42 |  23 | 89
// AMS | 2019 |  1 |  8 |  68 |  75 | 92
// AMS | 2019 |  1 |  9 |  98 |  42 | 86
// AMS | 2019 |  1 | 10 |  92 |  12 | 76
// AMS | 2019 |  1 | 11 |  66 |   0 | 71
// AMS | 2019 |  1 | 12 |  90 |  56 | 85
// AMS | 2019 |  1 | 13 |  83 | 139 | 90

编辑1:我厌倦了使用map函数,我能够计算数学计算,如何在WeatherId定义的这些值前面添加键?

            case class Weather( loc: String, yyyy: Int, mm: Int, dd: Int,dr: Double, tg: Double, ug: Double)
            case class WeatherId(loc: String, yyyy: Int, mm: Int, dd: Int)

                   val rows = dataset1
                                        .map(line => Weather(
                                              line.getAs[String]("loc"),
                                              line.getAs[Int]("yyyy"),
                                              line.getAs[Int]("mm"),
                                              line.getAs[Int]("dd"),
                                              line.getAs[Double]("dr"),
                                              line.getAs[Double]("tg"),
                                              line.getAs[Double]("ug")
                                                            ) )


                  val pred_factor   = rows
                                        .map(x => (( ((x.dr * betaz) + (x.tg * betay)) + (x.ug) * betaz)))

谢谢

scala apache-spark cassandra spark-cassandra-connector
1个回答
0
投票

TL; DR; 使用Dataframe / Dataset而不是RDD。

关于RDD的DF的论证很长,但缺点是DF及其结构化替代DS'优于低级RDD。

使用spark-cassandra连接器,configure input split size可以决定火花中分区大小的大小,更多的分区更加平行。

val lastdf = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map(
    "table" -> "words",
    "keyspace" -> "test" ,
    "cluster" -> "ClusterOne",
    "spark.cassandra.input.split.size_in_mb" -> 48 // smaller size = more partitions
    )
  ).load()
© www.soinside.com 2019 - 2024. All rights reserved.