我正在开发一个在 Amazon EMR 上运行的 PySpark 应用程序,其中我的任务涉及根据 DataFrame 中的 URL 下载文件。目标是在 EMR 执行器上持续下载这些文件,直到达到指定的文件大小限制。一旦达到这个限制,我打算将这些文件捆绑到一个 tar 存档中。但是,我面临着在启动 tar 文件创建过程之前正确强制执行文件大小限制的挑战。我尝试使用
mapPartitions
函数来批处理多个文件,直到它们共同达到文件大小限制,之后应将它们归档到单个 tar 文件中。
该过程应按如下方式进行:
这是我的方法的简化版本:
from pyspark.sql import SparkSession
import os
import uuid
from pyspark.sql.functions import spark_partition_id
# Initialize Spark session
spark = SparkSession.builder.appName("DownloadandTar").getOrCreate()
# Sample DataFrame of files
files = [("file1",), ("file2",), ("file3",), ...]
schema = ["file"]
df = spark.createDataFrame(files, schema=schema)
# Define the file size limit (e.g., 100MB)
FILE_SIZE_LIMIT = 100 * 1024 * 1024 # 100MB
def download_and_tar_files(partition):
accumulated_size = 0
files_to_tar = []
for row in partition:
file = row.filename
file_path, file_size = download_file(file)
files_to_tar.append(file_path)
accumulated_size += file_size
if accumulated_size >= FILE_SIZE_LIMIT:
tar_file_name = f"{uuid.uuid4()}.tar"
create_tar_file(files_to_tar, tar_file_name)
files_to_tar = []
accumulated_size = 0
# Handle any remaining files
if files_to_tar:
tar_file_name = f"{uuid.uuid4()}.tar"
create_tar_file(files_to_tar, tar_file_name)
# Apply the function to each partition
df.repartition(spark_partition_id()).foreachPartition(download_and_tar_files)
任何人都可以深入了解我的尺寸计算或批处理逻辑的潜在问题吗?我还尝试使用
map
函数来处理每一行文件名。尽管采用了这种方法,但我当前的实现结果是为每个单独的文件创建一个 tar 文件,无法在达到大小阈值之前累积多个文件。这会导致每个 tar 存档仅包含 one 文件的输出,而不是预期的批处理,直到满足指定的大小限制。
我在 AWS EMR 核心节点上遇到的
FILE_SIZE_LIMIT
未遵守的问题源于 Spark 分布式架构中管理和访问全局变量的方式。具体来说,FILE_SIZE_LIMIT
是一个全局变量,是在EMR集群的主节点上初始化的。但是,由于每个节点都在自己的内存和进程空间中运行,因此无法在核心(执行器)节点上直接访问该变量。
经过一番深入研究,我发现在 Spark 应用程序中,驱动程序(在主节点上运行)和执行程序(在核心节点上运行)不共享相同的内存空间。这意味着驱动程序脚本中声明的全局变量不会自动传播到执行程序。因此,执行程序节点内依赖于
FILE_SIZE_LIMIT
的任何逻辑都不会按预期运行,因为执行程序没有该变量的正确值。
为了解决这个问题,我需要显式地将
FILE_SIZE_LIMIT
值传递给执行器节点。这是通过将其作为在执行器上执行的函数的参数来实现的。