我对 Spark、Spark 上下文元素和处理它们不是很有经验...请就以下问题提出建议:
我曾经运行过一个测试,涉及在 python 中创建模拟 Spark 上下文,效果很好。自从我们转向 python
3.9.6
,当尝试使用模式创建一个简单的数据框时,此代码会引发异常(这是我的代码的简化版本,会引发相同的错误):3.12.2
这是引发的异常。
from pyspark.sql import SparkSession
from pyspark.sql.types import (
StructType,
StructField,
StringType,
TimestampType,
)
BODACC_SCHEMA = StructType(
[
StructField("siren", StringType(), True),
StructField("identifier", StringType(), True),
StructField("nojo", StringType(), True),
StructField("type_annonce", StringType(), True),
StructField("date_publication", TimestampType(), True),
StructField("source_identifier", StringType(), True),
StructField("data_source", StringType(), True),
StructField("created_at", TimestampType(), True),
StructField("updated_at", TimestampType(), True),
]
)
spark_session = SparkSession.builder.master("local[1]").appName("LocalExample").getOrCreate()
spark_session.createDataFrame([], BODACC_SCHEMA)
我正在使用
Traceback (most recent call last):
File "/home/user/.pyenv/versions/3.12.2/envs/data_pipelines/lib/python3.12/site-packages/pyspark/serializers.py", line 458, in dumps
return cloudpickle.dumps(obj, pickle_protocol)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/3.12.2/envs/data_pipelines/lib/python3.12/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/home/user/.pyenv/versions/3.12.2/envs/data_pipelines/lib/python3.12/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/3.12.2/envs/data_pipelines/lib/python3.12/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 692, in reducer_override
return self._function_reduce(obj)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/3.12.2/envs/data_pipelines/lib/python3.12/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 565, in _function_reduce
return self._dynamic_function_reduce(obj)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/3.12.2/envs/data_pipelines/lib/python3.12/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 546, in _dynamic_function_reduce
state = _function_getstate(func)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/3.12.2/envs/data_pipelines/lib/python3.12/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 157, in _function_getstate
f_globals_ref = _extract_code_globals(func.__code__)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/3.12.2/envs/data_pipelines/lib/python3.12/site-packages/pyspark/cloudpickle/cloudpickle.py", line 334, in _extract_code_globals
out_names = {names[oparg]: None for _, oparg in _walk_global_ops(co)}
~~~~~^^^^^^^
IndexError: tuple index out of range
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/home/user/.pyenv/versions/3.12.2/envs/data_pipelines/lib/python3.12/site-packages/pyspark/sql/session.py", line 894, in createDataFrame
return self._create_dataframe(
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/3.12.2/envs/data_pipelines/lib/python3.12/site-packages/pyspark/sql/session.py", line 938, in _create_dataframe
jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/3.12.2/envs/data_pipelines/lib/python3.12/site-packages/pyspark/rdd.py", line 3113, in _to_java_object_rdd
return self.ctx._jvm.SerDeUtil.pythonToJava(rdd._jrdd, True)
^^^^^^^^^
File "/home/user/.pyenv/versions/3.12.2/envs/data_pipelines/lib/python3.12/site-packages/pyspark/rdd.py", line 3505, in _jrdd
wrapped_func = _wrap_function(
^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/3.12.2/envs/data_pipelines/lib/python3.12/site-packages/pyspark/rdd.py", line 3362, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/3.12.2/envs/data_pipelines/lib/python3.12/site-packages/pyspark/rdd.py", line 3345, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/3.12.2/envs/data_pipelines/lib/python3.12/site-packages/pyspark/serializers.py", line 468, in dumps
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: IndexError: tuple index out of range
。
为什么会发生这种情况,我该如何解决?编辑:我也尝试过使用
pyspark==3.3.0
,但它也不起作用。