AssertionError: SparkContext._active_spark_context 不是 None

问题描述 投票:0回答:1

我创建了一个对象,它在运行

__init__
函数时从字典创建了一个地图。这是在任何函数或类之外完成的。所以它在导入期间加载模块时运行。

我运行它时它工作正常,但是当我使用 SparkStreaming 运行它时,我得到如下所示的断言错误。它被抛入类的

__init__
函数中。

为什么我只有在使用 Spark Streaming 时才会遇到这个问题,我该如何解决?

File "some_file.py", line 58, in __init__
    some_map = F.create_map(*[F.lit(x) for x in chain(*some_dict.items())])
  File "some_file.py", line 58, in <listcomp>
    some_map = F.create_map(*[F.lit(x) for x in chain(*some_dict.items())])
  File "/databricks/spark/python/pyspark/sql/functions.py", line 139, in lit
    return col if isinstance(col, Column) else _invoke_function("lit", col)
  File "/databricks/spark/python/pyspark/sql/functions.py", line 85, in _invoke_function
    assert SparkContext._active_spark_context is not None
AssertionError
python apache-spark pyspark
1个回答
0
投票

您遇到的 AssertionError 是由于调用 PySpark 的 SQL 函数模块中的 lit() 函数时 SparkContext 尚未初始化。当您尝试在 Spark 作业上下文之外使用 PySpark 代码时,通常会发生这种情况,例如在独立脚本中或在由在 Spark 作业上下文之外运行的其他代码导入的模块中。

在提问之前尝试这段代码并阅读更多代码示例

from pyspark.sql.functions import create_map, lit
from pyspark.streaming import StreamingContext

# create the Spark Streaming context
ssc = StreamingContext(sparkContext, batchDuration=5)

# define the class that creates the map
class MapCreator:
    def __init__(self, some_dict):
        # create the map using PySpark's create_map and lit functions
        some_map = create_map(*[lit(x) for x in chain(*some_dict.items())])
        # do something with the map...

# create an instance of the MapCreator class within a Spark Streaming job
def process_stream(stream):
    map_creator = MapCreator(some_dict)
    # do something with the MapCreator instance...

# create a DStream from the stream source and apply the process_stream function
stream = ssc.socketTextStream("localhost", 9999)
stream.foreachRDD(process_stream)

# start the Spark Streaming context
ssc.start()
ssc.awaitTermination()
© www.soinside.com 2019 - 2024. All rights reserved.