转换CassandraTableScanRDD org.apache.spark.rdd.RDD

问题描述 投票:1回答:1

我有以下情况。我有大型Cassandra表(有大量列),我想用Spark处理。我只希望将选定的列加载到Spark(在Cassandra服务器本身上应用选择和过滤)

 val eptable = 
 sc.cassandraTable("test","devices").select("device_ccompany","device_model","devi
 ce_type")

上面的语句给出了一个CassandraTableScanRDD,但我如何将其转换为DataSet / DataFrame?

Si还有其他任何方式我可以对列进行服务器端过滤并获取数据帧吗?

scala apache-spark dataframe cassandra rdd
1个回答
1
投票

在DataStax Spark Cassandra Connector中,您可以将Cassandra数据作为Dataset读取,并在服务器端修剪列,如下所示:

val df = spark
 .read
 .format("org.apache.spark.sql.cassandra")
 .options(Map( "table" -> "devices", "keyspace" -> "test" ))
 .load()

val dfWithColumnPruned = df
 .select("device_ccompany","device_model","device_type")

请注意,我在阅读后执行的selection操作是使用Catalyst优化推送到服务器端的。有关详细信息,请参阅此document

© www.soinside.com 2019 - 2024. All rights reserved.