Cassandra加入后解析Spark RDD

问题描述 投票:0回答:1

加入Cassandra后,我有一个RDD,但是我无法解析得到的RDD。这是详细信息

case class IP (key: String, key2: String,key3: String,key4: String,key5: String,key6: String,key7: String,key8: String,key9: String,key10: String,key11: String,key12: String,key13: String,key14: String,key15: String,column1:String,column2:String,column3:String,column4:String,column5:String,value1:String)

val a = cs_cube2_2_6.rdd.map(p => IP(p(0).toString, p(1).toString, p(2).toString, p(3).toString, p(4).toString, p(5).toString, p(6).toString, p(7).toString, p(8).toString, p(9).toString, p(10).toString, p(11).toString, p(12).toString, p(13).toString, p(14).toString, p(15).toString, p(16).toString, p(17).toString, p(18).toString, p(19).toString, p(20).toString))


val joinWithRDD = a.joinWithCassandraTable("key","tbl").on(SomeColumns("key","key2","key3","key4","key5","key6","key7","key8","key9","key10","key11","key12","key13","key14","key15")).select("value1")

scala> joinWithRDD: com.datastax.spark.connector.rdd.CassandraJoinRDD[IP,com.datastax.spark.connector.CassandraRow] = CassandraJoinRDD[15] at RDD at CassandraRDD.scala:19

上面显示的是RDD模式。 RDD的输出是这样的。

(IP(2_2_6,AA,FF,14-12-07 23,false,true,-,-,-,-,-,-,-,-,-,-,-,-,-,3580),CassandraRow{value1: 3580})

(IP(2_2_6,BB,GG,143,2019-12-07 00,false,true,-,-,-,-,-,-,-,-,-,-,-,-,-,154),CassandraRow{value1: 154})

我不确定如何解析此RDD。我想总结IP中的最后一列和Cassandra行中的value1列。

让我知道是否需要其他详细信息。并感谢帮助

apache-spark rdd spark-cassandra-connector
1个回答
0
投票

您需要执行类似的操作(不检查代码,但是应该进行一些小的调整-我假设value1在Cassandra中具有整数类型:]

joinWithRDD.map { case (ip, row) => 
   val newVal = ip.value1.toInteger + row.getInt("value1")
   IP(ip.key, key2, .... newVal.toString)
}

joinWithCassandraTable返回您的数据元组为_1,在Cassandra中找到的数据为_2。访问Cassandra数据时,可以使用getter函数getIntgetString等,也可以按照Row中的描述将documentation for Spark Cassandra Connector映射到案例类。

© www.soinside.com 2019 - 2024. All rights reserved.