将从joinWithCassandraTable获取的CassandraRow转换为DataFrame

问题描述 投票:1回答:1
case class SourcePartition(id: String, host:String ,bucket: Int)
joinedRDDs =partitions.joinWithCassandraTable("db_name","table_name")
joinedRDDs.values.foreach(println)

我必须使用joinWithCassandraTable,如何将结果CassandraRow转换为DataFrame?或者与DataFrame有任何等价的joinWithCassandraTable?

我必须一次读取很多分区,我知道Datastax Cassandra连接器Predicate按下,但它允许一次只拉一个分区(它似乎不允许IN运算符,只有似乎支持)

apache-spark cassandra spark-cassandra-connector
1个回答
0
投票
val spark: SparkSession = SparkSession.builder().master("local[4]").appName("RDD2DF").getOrCreate()
    val sc: SparkContext = spark.sparkContext

    import spark.implicits._

    val internalJoinRDD = spark.sparkContext.cassandraTable("test", "test_table_1").joinWithCassandraTable("test", "table_table_2")
    internalJoin.toDebugString

    internalJoinRDD.toDF()

你能试试上面的代码片段吗?

如果您有数据架构,则可以使用

def createDataFrame(internalJoinRDD: RDD[Row], schema: StructType): DataFrame
© www.soinside.com 2019 - 2024. All rights reserved.