使用 pyspark 创建一个 50Giga 的随机整数镶木地板文件失败

问题描述 投票:0回答:1

我尝试过使用不同大小的集群(AWS 上的 EMR),但由于 YARN 杀死所有节点,它总是失败: https://aws.amazon.com/premiumsupport/knowledge-center/emr-exit-status-100-lost-node/

我认为这是由于内存要求太高,但我有一个 10 个 m5.4xlarge 实例(64Giga RAM)的集群,它仍然失败。

Pyspark 代码:


    num_of_ints = int(size_in_mb * 1024 * 1024 / 4)
    max_int = 2147483647


    # Create a SparkSession
    spark = SparkSession\
        .builder\
        .appName("GenerateRandomData") \
        .getOrCreate()

    # Generate a DataFrame with num_of_ints rows and a column named "value" between 0 and max_int
    df = spark.range(num_of_ints).withColumn("value", (rand(seed=42) * max_int).cast("integer"))

    # Save the DataFrame to a Parquet file
    out_file = os.path.join(out_folder, 'random_list.parquet')
    partitions = math.ceil(size_in_mb/10000) # the parquet file will be broken to chunks of 10giga
    df.repartition(partitions).write.mode("overwrite").parquet(out_file)

    # Stop the SparkSession
    spark.stop()

我愿意以任何其他方式创建一个 50Giga 的镶木地板文件,其中包含随机整数。

此外,数据生成阶段仅由 2 个任务完成,但我的集群中有大约 140 个核心: From spark UI

谢谢!

pyspark random hadoop-yarn parquet amazon-emr
1个回答
0
投票

delta 表的核心是 parquet 文件。正如我们在 spark 中所知,除非你重新分区 (1),否则你有多个分区。

让我们使用迭代方法将数据添加到我们的增量表,直到行数或大小符合您的目标。

下面的代码创建一个新的数据库。

%sql
CREATE DATABASE stack

下面的代码创建一个新表。

%sql 
CREATE TABLE someints (id INT, value INT, stamp STRING);

创建一个函数以将数据添加到增量表中。

# required libraries
from pyspark.sql.functions import *
from datetime import datetime

def add_random_data():
  
  # get variables
  num_of_ints = int(1024 * 1024)
  max_int = 2147483647
  stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
  table_name = "stack.someints"

  # create dataframe
  df = spark.range(num_of_ints).withColumn("value", (rand(seed=42) * max_int).cast("integer")).withColumn("stamp", lit(stamp))

  # write data frame
  df.write.mode("append").format("delta").saveAsTable(table_name)

我每次调用添加一百万行。您可以使用 for in range 循环来重复调用名为 add_random_data() 的函数。

%sql
select stamp, count(*) as cnt from stack.someints group by stamp

我正在使用 stamp 来捕获我调用该函数的日期/时间。

最后但同样重要的是,我们可以使用增量表属性来获取总字节数。

# get size in bytes
spark.sql("describe detail stack.someints").select("sizeInBytes").collect()

一旦你有了正确的尺寸,你就可以随时写出你想要的任何格式。

请记住,parquet 是一种列式存储格式,制作一个 50 GB 的文件可能需要一段时间。然而,这种迭代方法应该避免内存不足的问题。

© www.soinside.com 2019 - 2024. All rights reserved.