Pyspark将RDD保存到Cassandra

问题描述 投票:0回答:1

我有RDD(test_rdd)如下

[
{'user_lname': u'TEst1', 'user_id': u'2aa8ae30-c0e5-48bb-ab16-a2ed2e78c8c3', 'user_phone': u'1234567890', 'user_fname': u'TestingTesting2', 'amount': 1222,’event_timestamp': u’2016-09-29T07:49:50.866+00:00’}, 

{'user_lname': u'TEst2', 'user_id': u'2aa8ae30-c0e5-48bb-ac16-a2ed2e78c8c3', 'user_phone': u'1234567891', 'user_fname': u'TestingTesting', 'amount': 12,’event_timestamp': u’2016-10-27T07:49:50.866+00:00’},

{'user_lname': u'TEst3', 'user_id': u'2aa8ae30-c1e5-48bb-ab16-a2ed2e78c8c3', 'user_phone': u'1234567892', 'user_fname': u'TestingTesting3', 'amount': 122,’event_timestamp': u’2016-09-27T07:49:50.866+00:00'}
]

我想将上面的RDD保存到cassandra表中。 我使用时收到以下错误

test_rdd.saveToCassandra("keyspace1","table1")  

回溯(最近一次调用最后一次):文件“/var/spark/test/k.py”,第179行,in parsed_data.saveToCassandra(“keyspace1”,“table1”)AttributeError:'PipelinedRDD'对象没有属性'saveToCassandra'

python apache-spark cassandra pyspark datastax-startup
1个回答
0
投票

要么

  • 按照官方spark-cassandra-connector的说明进行操作
  • 转换为DataFrametoDF
  • Dataframe df.write.format("org.apache.spark.sql.cassandra").options( table=table, keyspace=keyspace ).save()
© www.soinside.com 2019 - 2024. All rights reserved.