将 SparkDF 转换为 Pandas DF 失败

问题描述 投票:0回答:2

我有一个在 Dataproc 集群上运行的 Spark 代码,该代码将表从 BigQuery 读取到 Spark 数据帧中。在此代码中,有一个步骤我需要使用 pandas 数据框逻辑执行一些数据处理。但是,当我尝试将 Spark 数据帧转换为 pandas 数据帧时,遇到无法解决的错误。值得注意的是,这段代码在 Hadoop 上运行良好,没有任何问题。

如果您能从 Spark 数据帧转换为 pandas 数据帧来解决此问题,我将不胜感激。

df=df.toPandas()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py", line 141, in toPandas
  File "/opt/conda/default/lib/python3.8/site-packages/pandas/core/frame.py", line 2317, in from_records
    mgr = arrays_to_mgr(arrays, columns, result_index, typ=manager)
  File "/opt/conda/default/lib/python3.8/site-packages/pandas/core/internals/construction.py", line 153, in arrays_to_mgr
    return create_block_manager_from_column_arrays(
  File "/opt/conda/default/lib/python3.8/site-packages/pandas/core/internals/managers.py", line 2142, in create_block_manager_from_column_arrays
    mgr._consolidate_inplace()
  File "/opt/conda/default/lib/python3.8/site-packages/pandas/core/internals/managers.py", line 1829, in _consolidate_inplace
    self.blocks = _consolidate(self.blocks)
  File "/opt/conda/default/lib/python3.8/site-packages/pandas/core/internals/managers.py", line 2272, in _consolidate
    merged_blocks, _ = _merge_blocks(
  File "/opt/conda/default/lib/python3.8/site-packages/pandas/core/internals/managers.py", line 2297, in _merge_blocks
    new_values = np.vstack([b.values for b in blocks])  # type: ignore[misc]
  File "<__array_function__ internals>", line 180, in vstack
  File "/opt/conda/default/lib/python3.8/site-packages/numpy/core/shape_base.py", line 282, in vstack
    return _nx.concatenate(arrs, 0)
  File "<__array_function__ internals>", line 180, in concatenate
numpy.core._exceptions.MemoryError: Unable to allocate 69.9 MiB for an array with shape (4289, 2137) and data type int64

我正在处理的代码处理的数据量相对较小,最大行数约为 10,000。然而,主要的挑战在于数据源有超过 4,000 列。在此特定示例中,代码在处理 2,137 行数据时失败。

python pandas dataframe apache-spark google-cloud-dataproc
2个回答
1
投票

spark
数据帧转换为
pandas
时,数据将不再在
dataproc
节点之间共享并收集到驱动程序机器,这就是为什么您会出现内存错误,而不是将数据转换为 pandas 尝试使用 Spark Dataframe API 来处理数据


0
投票

您遇到的错误表明,在将 Spark DataFrame 转换为 Pandas DataFrame 期间,进程内存不足。这可能是由于您的数据中包含大量列。尽管数据集的行数可能看起来不大,但高维度(即大量列)会显着增加内存使用量,特别是在需要将整个数据集保存在内存中的操作期间,例如此转换过程。

可能的解决方案:

  1. 增加内存:如果可能,增加 Spark 驱动程序节点的内存分配。这是执行 .toPandas() 操作的节点,因此需要大量内存来处理大型 DataFrame 的转换。如何执行此操作取决于您的特定环境和配置(例如,在 Spark 提交命令或 Dataproc 集群配置中调整 Spark 属性,例如 spark.driver.memory)。

  2. 减少转换前的数据大小:选择更少的列:如果您的 Pandas DataFrame 操作不需要全部 4,000 多列,请在转换 DataFrame 之前仅选择所需的列。

  3. 使用 Arrow 进行转换:PySpark 支持使用 Apache Arrow 将 Spark DataFrame 转换为 Pandas DataFrame,这比默认转换过程更高效。要使用 Arrow,您需要在 Spark 会话中启用它:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df = df.toPandas()

请注意,虽然 Arrow 可以提高性能并减少内存使用量,但如果数据集的大小是内存错误的根本原因,它可能无法完全解决问题。

  1. 利用 Pandas UDF:如果转换为 Pandas DataFrame 的目的是执行某些仅在 Pandas 中更容易或可能的操作,请考虑在 Spark 中使用 Pandas UDF(用户定义函数)。这允许您将适用于 Pandas DataFrames 的函数应用到 Spark DataFrame 的每个分区,从而避免一次转换整个 DataFrame 的需要。
© www.soinside.com 2019 - 2024. All rights reserved.