如何使用 sparkSession 在数据框架中写在 pyspark 使用 spark-cassandra-connector。

问题描述 投票:0回答:1

我使用的是 侏儒公园spark-cassandra-connector_2.11-2.3.0.jar。 在cassandra DB中,我从一个关键空间读取数据帧,并写入另一个不同的关键空间。这两个关键空间有不同的用户名和密码。

我使用sparkSession创建了。

spark_session = None

def set_up_spark(sparkconf,config):
    """
    sets up spark configuration and create a session
    :return: None
    """
    try:
        logger.info("spark conf set up Started")
        global spark_session
        spark_conf = SparkConf()
        for key, val in sparkconf.items():
            spark_conf.set(key, val)
        spark_session = SparkSession.builder.config(conf=spark_conf).getOrCreate()
        logger.info("spark conf set up Completed")
    except Exception as e:
        raise e

我使用这个sparkSession以数据帧的形式读取数据。

table_df = spark_session.read \
            .format("org.apache.spark.sql.cassandra") \
            .options(table=table_name, keyspace=keyspace_name) \
            .load()

我能够使用上面的会话读取数据。spark_session被附加到上面的查询。

现在我需要创建另一个会话,因为写表的凭证是不同的,我有写查询。

table_df.write \
            .format("org.apache.spark.sql.cassandra") \
            .options(table=table_name, keyspace=keyspace_name) \
            .mode("append") \
            .save()

我找不到如何在cassandra中为上述写操作附加一个新的sparkSession。

如何在pyspark中用spark-cassandra-connector为写操作附加一个新的SparkSession?

python apache-spark pyspark cassandra spark-cassandra-connector
1个回答
0
投票

你可以简单地将这些信息作为选项传递给特定的 readwrite 操作,这包括像。spark.cassandra.connection.host,

请注意,你需要将这些选项放入一个字典中,并传递这个字典,而不是像在 文件.

read_options = { "table": "..", "keyspace": "..", 
  "spark.cassandra.connection.host": "IP1", 
  "spark.cassandra.auth.username": "username1", 
  "spark.cassandra.auth.password":"password1"}
table_df = spark_session.read \
            .format("org.apache.spark.sql.cassandra") \
            .options(**read_options) \
            .load()

write_options = { "table": "..", "keyspace": "..", 
  "spark.cassandra.connection.host": "IP2", 
  "spark.cassandra.auth.username": "username2", 
  "spark.cassandra.auth.password":"password1"}
table_df.write \
            .format("org.apache.spark.sql.cassandra") \
            .options(**write_options) \
            .mode("append") \
            .save()
© www.soinside.com 2019 - 2024. All rights reserved.