我创建了一个对象,它在运行
__init__
函数时从字典创建了一个地图。这是在任何函数或类之外完成的。所以它在导入期间加载模块时运行。
我运行它时它工作正常,但是当我使用 SparkStreaming 运行它时,我得到如下所示的断言错误。它被抛入类的
__init__
函数中。
为什么我只有在使用 Spark Streaming 时才会遇到这个问题,我该如何解决?
File "some_file.py", line 58, in __init__
some_map = F.create_map(*[F.lit(x) for x in chain(*some_dict.items())])
File "some_file.py", line 58, in <listcomp>
some_map = F.create_map(*[F.lit(x) for x in chain(*some_dict.items())])
File "/databricks/spark/python/pyspark/sql/functions.py", line 139, in lit
return col if isinstance(col, Column) else _invoke_function("lit", col)
File "/databricks/spark/python/pyspark/sql/functions.py", line 85, in _invoke_function
assert SparkContext._active_spark_context is not None
AssertionError
您遇到的 AssertionError 是由于调用 PySpark 的 SQL 函数模块中的 lit() 函数时 SparkContext 尚未初始化。当您尝试在 Spark 作业上下文之外使用 PySpark 代码时,通常会发生这种情况,例如在独立脚本中或在由在 Spark 作业上下文之外运行的其他代码导入的模块中。
在提问之前尝试这段代码并阅读更多代码示例
from pyspark.sql.functions import create_map, lit
from pyspark.streaming import StreamingContext
# create the Spark Streaming context
ssc = StreamingContext(sparkContext, batchDuration=5)
# define the class that creates the map
class MapCreator:
def __init__(self, some_dict):
# create the map using PySpark's create_map and lit functions
some_map = create_map(*[lit(x) for x in chain(*some_dict.items())])
# do something with the map...
# create an instance of the MapCreator class within a Spark Streaming job
def process_stream(stream):
map_creator = MapCreator(some_dict)
# do something with the MapCreator instance...
# create a DStream from the stream source and apply the process_stream function
stream = ssc.socketTextStream("localhost", 9999)
stream.foreachRDD(process_stream)
# start the Spark Streaming context
ssc.start()
ssc.awaitTermination()