将spark上下文作为PySpark中文件之间的参数传递

问题描述 投票:1回答:1

相关代码片段:

文件1:master.py

# Spark Imports
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext

#Import self defined function
from helper import enrichment


def ingestion(sc,ssc):
    # Work with stream
    kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "streaming-consumer", {topic: 1})
    # Call function defined in helper.py
    enriched_data = kafkaStream_json.map(lambda single_log:enrichment(single_log,client_id,machine_id))

if __name__ == "__main__":
    # Name of Spark App
    conf = SparkConf().setAppName("Test")

    # Spark and Spark streaming configuration
    sc = SparkContext(conf=conf)
    ssc = StreamingContext(sc, 1)
    ingestion(sc,ssc)
    # Start the stream and keep it running unless terminated
    ssc.start()
    ssc.awaitTermination()

File2:helper.py

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
def enrichment():
    test_df = pd.DataFrame(some operations...)
    spark_df = sqlContext.createDataFrame(test_df)
    ...

面临的问题:

这个流的部分工作正常,但是当我调用函数enrichment时,这些是我根据用法面临的以下问题:

案例1:运行上面的例子时,它说:

spark_df = sqlContext.createDataFrame(test_df)
NameError: global name 'sqlContext' is not defined

情况2:当我将spark Context作为参数传递时,这是显示的消息:

“例外:您似乎尝试从广播变量,操作或转换中引用SparkContext。SparkContext只能用于驱动程序,而不能用于在工作程序上运行的代码。有关更多信息,请参阅SPARK-5063。”

这是我找到的最接近的解决方案:ERROR:SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063

但是,它似乎没有解决我的问题。任何线索将不胜感激。

我需要将这些作为两个单独的文件,Inline将无法正常工作。代码运行使用:

sudo $SPARK_HOME/spark-submit --master local[2] /home/user/master.py

apache-spark pyspark apache-spark-sql
1个回答
1
投票

我认为你应该使用SparkSession。

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('abc').getOrCreate()

您可以将spark作为参数传递给富集函数:

def enrichment(spark):
    test_df = pd.DataFrame(some operations...)
    spark_df = spark.createDataFrame(test_df)
    ...

要么:

def enrichment():
    spark = SparkSession.builder.getOrCreate()
    test_df = pd.DataFrame(some operations...)
    spark_df = spark.createDataFrame(test_df)
    ...
© www.soinside.com 2019 - 2024. All rights reserved.