Foreachpartition循环中保存数据帧卡桑德拉

问题描述 投票:1回答:1

我要寻找节省foreachpartition循环卡桑德拉整个数据帧。

我知道我可以foreachpartition内获得卡桑德拉连接器,通过使用下面的代码执行上卡桑德拉CRUD语句:

val conf: SparkConf = new SparkConf(true)
.set("spark.cassandra.connection.host", "IP")
.set("spark.cassandra.auth.username", "username")
.set("spark.cassandra.auth.password", "pwd")
val cdbConnector = CassandraConnector(conf)

cdbConnector.withSessionDo(session =>
session.execute(//Insert statement)
)

但我更感兴趣的是存储卡珊德拉表中的数据框全在foreachpartition内一气呵成。

此外,当我执行下面的语句来foreachpartition中创建数据框中谱写卡桑德拉一样,我的应用程序越来越挂起,所有线程都处于等待状态。

df.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "***", "keyspace" -> "***")).save()
scala apache-spark apache-spark-sql
1个回答
0
投票

您应该能够直接使用Cassandra的API调用,你可以给一个尝试,看看它是否有什么差别

    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.11</artifactId>
        <version>${spark-cassandra-connector.version}</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>${cassandra-driver-core.version}</version>
    </dependency>

你应该能够保存数据,而你并不需要显式调用ForEachPartition

 .saveToCassandra("schema", "tableName")
© www.soinside.com 2019 - 2024. All rights reserved.