使用 EMR 从 1 个 zip 文件中解压多个文件

问题描述 投票:0回答:1

我有多个名为 f1.zip、f2.zip、...f7.zip 的 zip 文件,每个文件包含大约 200k xml 文件,我使用此代码进行多处理以并行解压缩它们,但即使每个文件都非常小,读/写过程使得解压缩非常慢,我想知道是否存在使用 AWS EMR 来加速解压缩过程的方法。

import os, boto3, zipfile
import sys
import multiprocessing
from io import BytesIO

sys.path.insert(0, './src')

def list_s3_obj(bucket, prefix, extension):
    s3 = boto3.resource('s3')
    raw_bucket = s3.Bucket(bucket)
    file_list = []

    for file in raw_bucket.objects.filter(Prefix=prefix):
        if os.path.join(file.key).endswith(extension):
            file_list.append((bucket, file.key))

    return file_list

def unzip_s3_obj(bucket, zip_key, unzip_key):
    print('Unzip | ' + zip_key)

    MB = 1024 ** 2
    config = boto3.s3.transfer.TransferConfig(
        use_threads = True,
        multipart_threshold = 1024*MB,
        max_concurrency = 4,
        io_chunksize = 300*MB,
    )
    s3 = boto3.resource('s3')

    buffer = BytesIO()
    zip_obj = s3.Object(bucket_name=bucket ,key=zip_key)
    zip_obj.download_fileobj(Fileobj=buffer, Config=config)
    z = zipfile.ZipFile(buffer)

    for filename in z.namelist():
        print('Extracting | ' + filename)
        file_info = z.getinfo(filename)
        s3.meta.client.upload_fileobj(
            z.open(filename),
            Bucket=bucket,
            Key=unzip_key + filename,
            Config=config
        )

current_key = 'Folder_spot'


bucket = 's3-bucket'
raw_key = f'{current_key}/zip/'
unzip_f1_key = f'{current_key}/unzip/f1/'
unzip_f2_key = f'{current_key}/unzip/f2/'
unzip_f3_key = f'{current_key}/unzip/f3/'
unzip_f4_key = f'{current_key}/unzip/f4/'
unzip_f5_key = f'{current_key}/unzip/f5/'
unzip_f6_key = f'{current_key}/unzip/f6/'
unzip_f7_key = f'{current_key}/unzip/f7/'


def unzip_s3():
    file_list = list_s3_obj(bucket, raw_key, '.zip')

    subprocess_map = [ (*e, unzip_f1_key) for e in file_list if 'f1' in e[1].lower()]
    subprocess_map.extend([ (*e, unzip_f2_key) for e in file_list if 'f2' in e[1].lower()])
    subprocess_map.extend([ (*e, unzip_f3_key) for e in file_list if 'f3' in e[1].lower()])
    subprocess_map.extend([ (*e, unzip_f4_key) for e in file_list if 'f4' in e[1].lower()])
    subprocess_map.extend([ (*e, unzip_f5_key) for e in file_list if 'f5' in e[1].lower()])
    subprocess_map.extend([ (*e, unzip_f6_key) for e in file_list if 'f6' in e[1].lower()])
    subprocess_map.extend([ (*e, unzip_f7_key) for e in file_list if 'f7' in e[1].lower()])

    p = multiprocessing.Pool(7)
    p.starmap(unzip_s3_obj, subprocess_map)

unzip_s3()

此外,我稍后使用Python中的库xmltodict将xml文件转换为json并将它们连接成20k记录,将文件数量从200k文件减少到10个文件,这加快了转换和稍后操作它们的过程,但是再次读取 200k xml 文件的过程非常慢,并且找不到使用 EMR 的代码示例或解决方案。

python-3.x amazon-web-services pyspark amazon-emr
1个回答
0
投票

以下是scala中的解决方案。我以前在工作中必须这样做。所以我在这里提取相关部分。

需要记住的一些重要事项。

如果在您的工作流程中可能,请尝试对文件进行 tar.gz 而不是 zip。因为我只尝试过那种格式。

其次,将 rdd

numPartitionsProvided
重新分区到适当大的数量,以便使用所有执行器。

ZipFileReader.scala

import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream

import org.apache.spark.input.PortableDataStream
import org.apache.spark.sql.SparkSession

import java.nio.charset.StandardCharsets
import scala.util.Try

object ZipFileReader {

  def decode(bytes: Array[Byte]) =
    new String(bytes, StandardCharsets.UTF_8)


  def extractFilesCSV(ps: PortableDataStream, n: Int = 1024) = Try {
    val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open))
    Stream.continually(Option(tar.getNextTarEntry))
      // Read until next exntry is null
      .takeWhile(_.isDefined)
      // flatten
      .flatMap(x => x)
      // Drop directories
      .filter(!_.isDirectory)
      .filter(ele => ele.getName.split("/")(1).endsWith(".csv"))
      .map(e => {
        println("This could be name", e.getName)
        (e.getName.split("/")(1),
          Stream.continually {
              // Read n bytes
              val buffer = Array.fill[Byte](n)(-1)
              val i = tar.read(buffer, 0, n)
              (i, buffer.take(i))}
            // Take as long as we have read something
            .takeWhile(_._1 > 0)
            .map(_._2)
            .flatten
            .toArray)})
      .toArray
  }


  def extractFilesJSON(ps: PortableDataStream, n: Int = 1024) = Try {
    val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open))
    Stream.continually(Option(tar.getNextTarEntry))
      // Read until next exntry is null
      .takeWhile(_.isDefined)
      // flatten
      .flatMap(x => x)
      // Drop directories
      .filter(!_.isDirectory)
      .filter(ele => ele.getName.split("/")(1).endsWith(".json"))
      .map(e => {
        println("This could be name", e.getName)
        (e.getName.split("/")(1),
          Stream.continually {
              // Read n bytes
              val buffer = Array.fill[Byte](n)(-1)
              val i = tar.read(buffer, 0, n)
              (i, buffer.take(i))}
            // Take as long as we have read something
            .takeWhile(_._1 > 0)
            .map(_._2)
            .flatten
            .toArray)})
      .toArray
  }


  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    val datafilename = "/zip-loc/myfirst.tar.gz"
    val output_path = "/zip-loc/output1/"




    val readInRawCSVFiles = spark.sparkContext.binaryFiles(datafilename).flatMapValues(x =>
      extractFilesCSV(x).toOption).map(ele => ele._2.map(inner_ele => (inner_ele._1, decode(inner_ele._2) )))

    val rawDataCSVParallelized = spark.sparkContext.parallelize(readInRawCSVFiles.first())

    //make this a large number so that all your executors are utilized
    val numPartitionsProvided = 3
    println("Writing out the files to output_path")
    rawDataCSVParallelized.repartition(numPartitions = numPartitionsProvided).map(ele => ele._2).saveAsTextFile(output_path)


    // The following code is only for sanity checking and testing everything is going smoothly
    // The following code can be removed from production file.
    rawDataCSVParallelized.foreach(ele => println("fileName within .tar.gz", ele._1))

    val countRawDataCSVParallelized = rawDataCSVParallelized.count()
    println(s"Count of the csv input files = $countRawDataCSVParallelized")


    val dataCollected = rawDataCSVParallelized.collect()
    val username1_csv = dataCollected.filter(ele => ele._1.equalsIgnoreCase("username1.csv")).head
    println("username1_csv")
    println(username1_csv)
    val username2_csv = dataCollected.filter(ele => ele._1.equalsIgnoreCase("username2.csv")).head
    println("username2_csv")
    println(username2_csv)

  }

}

我使用以下命令创建了一个 tar.gz 文件。

tar -czvf myfirst.tar.gz username_files

username_files
文件夹包含以下文件作为示例。

username1.csv
username2.csv
username3.csv
username4.csv
username5.csv
username6.csv

将 myfirst.tar.gz 上传到本地 hdfs 进行尝试并检查是否一切正常。

© www.soinside.com 2019 - 2024. All rights reserved.