我要寻找节省foreachpartition循环卡桑德拉整个数据帧。
我知道我可以foreachpartition内获得卡桑德拉连接器,通过使用下面的代码执行上卡桑德拉CRUD语句:
val conf: SparkConf = new SparkConf(true)
.set("spark.cassandra.connection.host", "IP")
.set("spark.cassandra.auth.username", "username")
.set("spark.cassandra.auth.password", "pwd")
val cdbConnector = CassandraConnector(conf)
cdbConnector.withSessionDo(session =>
session.execute(//Insert statement)
)
但我更感兴趣的是存储卡珊德拉表中的数据框全在foreachpartition内一气呵成。
此外,当我执行下面的语句来foreachpartition中创建数据框中谱写卡桑德拉一样,我的应用程序越来越挂起,所有线程都处于等待状态。
df.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "***", "keyspace" -> "***")).save()
您应该能够直接使用Cassandra的API调用,你可以给一个尝试,看看它是否有什么差别
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>${spark-cassandra-connector.version}</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>${cassandra-driver-core.version}</version>
</dependency>
你应该能够保存数据,而你并不需要显式调用ForEachPartition
.saveToCassandra("schema", "tableName")