Spark RDD与Cassandra Table联接

问题描述 投票:0回答:1

我正在将Spark RDDCassandra table(查找)一起加入,但无法理解一些事情。

  1. 将火花从Cassandra table中拉出range_start和range_end之间的所有记录,然后将其与火花存储器中的RDD合并,否则它将把RDD中的所有值下推到Cassandra并在那里进行连接
  2. 将在哪里应用限制(1)? (CassandraSpark
  3. 无论应用什么限制(1或1000),Spark总是会从Cassandra中提取相同数量的记录?

以下代码:

//creating dataframe with fields required for join with cassandra table
//and converting same to rdd
val df_for_join = src_df.select(src_df("col1"),src_df("col2"))
val rdd_for_join = df_for_join.rdd

val result_rdd = rdd_for_join
.joinWithCassandraTable("my_keyspace", "my_table"
,selectedColumns = SomeColumns("col1","col2","col3","col4")
,SomeColumns("col1", "col2")
).where("created_at >''range_start'' and created_at<= range_end")
.clusteringOrder(Ascending).limit(1)

Cassandra表的详细信息-

PRIMARY KEY ((col1, col2), created_at) WITH CLUSTERING ORDER BY (created_at ASC)
scala apache-spark cassandra datastax
1个回答
0
投票

joinWithCassandra表从传递的RDD中提取分区/主键值,并将其转换为针对Cassandra中分区的单独请求。然后,最重要的是,SCC可能会应用其他过滤,例如您处于where条件。如果我没记错的话,但我可能是错的,该限制将不会完全推到Cassandra上-每个分区仍然可以获取limit行。

您始终可以通过执行result_rdd.toDebugString来检查在何处进行连接。对于我的代码:

val df_for_join = Seq((2, 5),(5, 2)).toDF("col1", "col2")
val rdd_for_join = df_for_join.rdd

val result_rdd = rdd_for_join
.joinWithCassandraTable("test", "jt"
,selectedColumns = SomeColumns("col1","col2", "v")
,SomeColumns("col1", "col2")
).where("created_at >'2020-03-13T00:00:00Z' and created_at<= '2020-03-14T00:00:00Z'")
.limit(1)

它给出以下内容:

scala> result_rdd.toDebugString
res7: String =
(2) CassandraJoinRDD[14] at RDD at CassandraRDD.scala:19 []
 |  MapPartitionsRDD[2] at rdd at <console>:45 []
 |  MapPartitionsRDD[1] at rdd at <console>:45 []
 |  ParallelCollectionRDD[0] at rdd at <console>:45 []

如果您进行“普通”加入,您将获得以下信息:

scala> val rdd1 = sc.parallelize(Seq((2, 5),(5, 2)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[21] at parallelize at <console>:44
scala> val ct = sc.cassandraTable[(Int, Int)]("test", "jt").select("col1", "col2")
ct: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Int, Int)] = CassandraTableScanRDD[31] at RDD at CassandraRDD.scala:19

scala> rdd1.join(ct)
res15: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[34] at join at <console>:49
scala> rdd1.join(ct).toDebugString
res16: String =
(6) MapPartitionsRDD[37] at join at <console>:49 []
 |  MapPartitionsRDD[36] at join at <console>:49 []
 |  CoGroupedRDD[35] at join at <console>:49 []
 +-(3) ParallelCollectionRDD[21] at parallelize at <console>:44 []
 +-(6) CassandraTableScanRDD[31] at RDD at CassandraRDD.scala:19 []

更多信息可在corresponding section of SCC documentation中找到。

© www.soinside.com 2019 - 2024. All rights reserved.