如何在所有工作节点上并行写入 Pyspark 数据帧列表?

问题描述 投票:0回答:1

我正在尝试运行一个基本的 AWS Glue 4.0 作业,该作业运行转换函数并返回数据帧列表:

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from transform import transform
from pyspark.sql.functions import lit
from datetime import datetime

# ========================================== CONTEXT INITIALIZATION ====================================================
args = getResolvedOptions(sys.argv)

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# ======================================================================================================================



pyspark_df_list = transform(inputs)


// NOT SURE what to do here to achieve parallelization

# ======================================================================================================================

job.commit()

我尝试过的事情:

  1. 遍历列表似乎破坏了并行化,本质上使写入操作成为单线程。
  2. 创建
    write_df
    函数并尝试从 SpaceContext 调用
    parallelize/forEach
    函数会出现以下错误:

RuntimeError:您似乎正在尝试从广播变量、操作或转换引用 SparkContext。 SparkContext 只能在驱动程序上使用,不能在工作程序上运行的代码中使用。有关更多信息,请参阅 SPARK-5063。

apache-spark pyspark parallel-processing aws-glue distributed-system
1个回答
0
投票

使用

reduce
/
unionAllByName
创建单个 DataFrame,然后写入。如果仍然无法达到您所需的并行度,请用户
repartition
在数据中创建更多分区(写入期间 1 个分区 = 1 个工作进程):

from functools import reduce

df = reduce(lambda x,y: x.unionAllByName(y), pyspark_df_list)

df.write.repartition(128).format(....
© www.soinside.com 2019 - 2024. All rights reserved.