加入Cassandra后,我有一个RDD,但是我无法解析得到的RDD。这是详细信息
case class IP (key: String, key2: String,key3: String,key4: String,key5: String,key6: String,key7: String,key8: String,key9: String,key10: String,key11: String,key12: String,key13: String,key14: String,key15: String,column1:String,column2:String,column3:String,column4:String,column5:String,value1:String)
val a = cs_cube2_2_6.rdd.map(p => IP(p(0).toString, p(1).toString, p(2).toString, p(3).toString, p(4).toString, p(5).toString, p(6).toString, p(7).toString, p(8).toString, p(9).toString, p(10).toString, p(11).toString, p(12).toString, p(13).toString, p(14).toString, p(15).toString, p(16).toString, p(17).toString, p(18).toString, p(19).toString, p(20).toString))
val joinWithRDD = a.joinWithCassandraTable("key","tbl").on(SomeColumns("key","key2","key3","key4","key5","key6","key7","key8","key9","key10","key11","key12","key13","key14","key15")).select("value1")
scala> joinWithRDD: com.datastax.spark.connector.rdd.CassandraJoinRDD[IP,com.datastax.spark.connector.CassandraRow] = CassandraJoinRDD[15] at RDD at CassandraRDD.scala:19
上面显示的是RDD模式。 RDD的输出是这样的。
(IP(2_2_6,AA,FF,14-12-07 23,false,true,-,-,-,-,-,-,-,-,-,-,-,-,-,3580),CassandraRow{value1: 3580})
(IP(2_2_6,BB,GG,143,2019-12-07 00,false,true,-,-,-,-,-,-,-,-,-,-,-,-,-,154),CassandraRow{value1: 154})
我不确定如何解析此RDD。我想总结IP中的最后一列和Cassandra行中的value1列。
让我知道是否需要其他详细信息。并感谢帮助
您需要执行类似的操作(不检查代码,但是应该进行一些小的调整-我假设value1
在Cassandra中具有整数类型:]
joinWithRDD.map { case (ip, row) =>
val newVal = ip.value1.toInteger + row.getInt("value1")
IP(ip.key, key2, .... newVal.toString)
}
joinWithCassandraTable
返回您的数据元组为_1
,在Cassandra中找到的数据为_2
。访问Cassandra数据时,可以使用getter函数getInt
,getString
等,也可以按照Row
中的描述将documentation for Spark Cassandra Connector映射到案例类。