如何解决运行 pyspark 代码的 Dataproc 集群中的 OutOfMemoryError?

问题描述 投票:0回答:1

我正在编写 pyspark 代码,在其中连接到 BigQuery 表并将该源表导入为 df。该过程需要重命名 df 列名称。为此,我定义了一个字典,基本上是对其进行硬编码。

cols_new_to_original = {'colA_new':'colA_original', 'colB_new':'colB_Original'...}

这有大约 3000 多个键:值对,此外,我使用以下步骤使用

cols_new_to_original
重命名 df 的列。

代码:

# Replace column names using the cols_new_to_original
  df = df.repartition(30)
  for new_name, original_name in cols_new_to_original.items():
      df = df.withColumnRenamed(new_name, original_name)

这样做时,我收到以下错误:

Traceback (most recent call last):
  File "/tmp/5642d3d6-77f7-4615-aae9-dcd4e1c9bbdb/scorer.py", line 132, in <module>
    score()
  File "/tmp/5642d3d6-77f7-4615-aae9-dcd4e1c9bbdb/scorer.py", line 90, in score
    df = df.withColumnRenamed(new_name, original_name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 2475, in withColumnRenamed
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o118.withColumnRenamed.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.lang.reflect.Array.newInstance(Array.java:75)

以下是我的集群配置:

cluster_config = {
        "master_config": {
            "num_instances": 1,
            "machine_type_uri": "n2-standard-2",
            "disk_config": {
                "boot_disk_size_gb": 500
            }
        },
        "worker_config": {
            "num_instances": 8,
            "machine_type_uri": "n2-standard-8",
            "disk_config": {
                "boot_disk_size_gb": 1000
            }
        },
        "secondary_worker_config": {
            "num_instances": 1,
            "machine_type_uri": "n2-standard-8",
            "disk_config": {
                "boot_disk_size_gb": 1000
            },
            "preemptibility": "NON_PREEMPTIBLE"
        },
        "software_config": {
            "image_version": "2.0.27-centos8",
            "optional_components": [
                "JUPYTER"
            ],
        "properties": {
            "spark:spark.dynamicAllocation.enabled": "true",
            "spark:spark.dynamicAllocation.minExecutors": "1",
            "spark:spark.dynamicAllocation.maxExecutors": "10",
            "spark:spark.shuffle.service.enabled": "true"
            }
        }, .............................

最初我也尝试过

"spark:spark.executor.cores": "2"
"spark:spark.executor.memory": "16g"
,但我遇到了同样的问题。

apache-spark google-cloud-platform pyspark google-cloud-dataproc airflow-2.x
1个回答
0
投票

感谢@Dagang的建议。有帮助。

我还必须对重命名列的方式进行一些更改。

新代码:

from pyspark.sql.functions import col
[col(c).alias(cols_new_to_original.get(c, c)) for c in df.columns]

这有效。

© www.soinside.com 2019 - 2024. All rights reserved.