确保在 EMR 上的 PySpark 中批量处理下载时遵守文件大小限制

问题描述 投票:0回答:1

我正在开发一个在 Amazon EMR 上运行的 PySpark 应用程序,其中我的任务涉及根据 DataFrame 中的 URL 下载文件。目标是在 EMR 执行器上持续下载这些文件,直到达到指定的文件大小限制。一旦达到这个限制,我打算将这些文件捆绑到一个 tar 存档中。但是,我面临着在启动 tar 文件创建过程之前正确强制执行文件大小限制的挑战。我尝试使用

mapPartitions
函数来批处理多个文件,直到它们共同达到文件大小限制,之后应将它们归档到单个 tar 文件中。

该过程应按如下方式进行:

  1. 下载文件。
  2. 跟踪这些下载文件的累积大小。
  3. 一旦累积大小达到预定义的限制,请使用这些文件创建一个 tar 文件。
  4. 重置跟踪并重新开始处理下一批文件,直到所有文件都处理完毕。

这是我的方法的简化版本:

from pyspark.sql import SparkSession
import os
import uuid
from pyspark.sql.functions import spark_partition_id

# Initialize Spark session
spark = SparkSession.builder.appName("DownloadandTar").getOrCreate()

# Sample DataFrame of files
files = [("file1",), ("file2",), ("file3",), ...]
schema = ["file"]
df = spark.createDataFrame(files, schema=schema)

# Define the file size limit (e.g., 100MB)
FILE_SIZE_LIMIT = 100 * 1024 * 1024  # 100MB

def download_and_tar_files(partition):
    accumulated_size = 0
    files_to_tar = []

    for row in partition:
        file = row.filename
        file_path, file_size = download_file(file)
        files_to_tar.append(file_path)
        accumulated_size += file_size

        if accumulated_size >= FILE_SIZE_LIMIT:
            tar_file_name = f"{uuid.uuid4()}.tar"
            create_tar_file(files_to_tar, tar_file_name)
            files_to_tar = []
            accumulated_size = 0

    # Handle any remaining files
    if files_to_tar:
        tar_file_name = f"{uuid.uuid4()}.tar"
        create_tar_file(files_to_tar, tar_file_name)

# Apply the function to each partition
df.repartition(spark_partition_id()).foreachPartition(download_and_tar_files)

任何人都可以深入了解我的尺寸计算或批处理逻辑的潜在问题吗?我还尝试使用

map
函数来处理每一行文件名。尽管采用了这种方法,但我当前的实现结果是为每个单独的文件创建一个 tar 文件,无法在达到大小阈值之前累积多个文件。这会导致每个 tar 存档仅包含 one 文件的输出,而不是预期的批处理,直到满足指定的大小限制。

python apache-spark pyspark amazon-emr
1个回答
0
投票

我在 AWS EMR 核心节点上遇到的

FILE_SIZE_LIMIT
未遵守的问题源于 Spark 分布式架构中管理和访问全局变量的方式。具体来说,
FILE_SIZE_LIMIT
是一个全局变量,是在EMR集群的主节点上初始化的。但是,由于每个节点都在自己的内存和进程空间中运行,因此无法在核心(执行器)节点上直接访问该变量。

经过一番深入研究,我发现在 Spark 应用程序中,驱动程序(在主节点上运行)和执行程序(在核心节点上运行)不共享相同的内存空间。这意味着驱动程序脚本中声明的全局变量不会自动传播到执行程序。因此,执行程序节点内依赖于

FILE_SIZE_LIMIT
的任何逻辑都不会按预期运行,因为执行程序没有该变量的正确值。

为了解决这个问题,我需要显式地将

FILE_SIZE_LIMIT
值传递给执行器节点。这是通过将其作为在执行器上执行的函数的参数来实现的。

© www.soinside.com 2019 - 2024. All rights reserved.