我尝试过使用不同大小的集群(AWS 上的 EMR),但由于 YARN 杀死所有节点,它总是失败: https://aws.amazon.com/premiumsupport/knowledge-center/emr-exit-status-100-lost-node/
我认为这是由于内存要求太高,但我有一个 10 个 m5.4xlarge 实例(64Giga RAM)的集群,它仍然失败。
Pyspark 代码:
num_of_ints = int(size_in_mb * 1024 * 1024 / 4)
max_int = 2147483647
# Create a SparkSession
spark = SparkSession\
.builder\
.appName("GenerateRandomData") \
.getOrCreate()
# Generate a DataFrame with num_of_ints rows and a column named "value" between 0 and max_int
df = spark.range(num_of_ints).withColumn("value", (rand(seed=42) * max_int).cast("integer"))
# Save the DataFrame to a Parquet file
out_file = os.path.join(out_folder, 'random_list.parquet')
partitions = math.ceil(size_in_mb/10000) # the parquet file will be broken to chunks of 10giga
df.repartition(partitions).write.mode("overwrite").parquet(out_file)
# Stop the SparkSession
spark.stop()
我愿意以任何其他方式创建一个 50Giga 的镶木地板文件,其中包含随机整数。
此外,数据生成阶段仅由 2 个任务完成,但我的集群中有大约 140 个核心:
谢谢!
delta 表的核心是 parquet 文件。正如我们在 spark 中所知,除非你重新分区 (1),否则你有多个分区。
让我们使用迭代方法将数据添加到我们的增量表,直到行数或大小符合您的目标。
下面的代码创建一个新的数据库。
%sql
CREATE DATABASE stack
下面的代码创建一个新表。
%sql
CREATE TABLE someints (id INT, value INT, stamp STRING);
创建一个函数以将数据添加到增量表中。
# required libraries
from pyspark.sql.functions import *
from datetime import datetime
def add_random_data():
# get variables
num_of_ints = int(1024 * 1024)
max_int = 2147483647
stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
table_name = "stack.someints"
# create dataframe
df = spark.range(num_of_ints).withColumn("value", (rand(seed=42) * max_int).cast("integer")).withColumn("stamp", lit(stamp))
# write data frame
df.write.mode("append").format("delta").saveAsTable(table_name)
我每次调用添加一百万行。您可以使用 for in range 循环来重复调用名为 add_random_data() 的函数。
%sql
select stamp, count(*) as cnt from stack.someints group by stamp
我正在使用 stamp 来捕获我调用该函数的日期/时间。
最后但同样重要的是,我们可以使用增量表属性来获取总字节数。
# get size in bytes
spark.sql("describe detail stack.someints").select("sizeInBytes").collect()
一旦你有了正确的尺寸,你就可以随时写出你想要的任何格式。
请记住,parquet 是一种列式存储格式,制作一个 50 GB 的文件可能需要一段时间。然而,这种迭代方法应该避免内存不足的问题。