INPUT:输入数据集包含存储为实木复合地板的多个文件中的1000万笔交易。包括所有文件在内的整个数据集的大小范围为6到8GB。
问题陈述:根据客户ID对交易进行分区,这将为每个客户ID创建一个文件夹,并且每个文件夹包含该特定客户完成的所有交易。
HDFS对可以创建的根目录中子目录的数量有640万的硬限制,因此,使用客户ID的后两位(从00,01,02 ...到99)来创建顶级目录和每个顶级目录将包含所有以该特定两位数字结尾的客户ID。
示例输出目录结构:
00 / cust_id = 100900 / part1.csv00 / cust_id = 100800 / part33.csv01 / cust_id = 100801 / part1.csv03 / cust_id = 100803 / part1.csv
CODE:
// Reading input file and storing in cache
val parquetReader = sparksession.read
.parquet("/inputs")
.persist(StorageLevel.MEMORY_ONLY) //No spill will occur has enough memory
// Logic to partition
var customerIdEndingPattern = 0
while (cardAccountEndingPattern < 100) {
var idEndPattern = customerIdEndingPattern + ""
if (customerIdEndingPattern < 10) {
idEndPattern = "0" + customerIdEndingPattern
}
parquetReader
.filter(col("customer_id").endsWith(idEndPattern))
.repartition(945, col("customer_id"))
.write
.partitionBy("customer_id")
.option("header", "true")
.mode("append")
.csv("/" + idEndPattern)
customerIdEndingPattern = customerIdEndingPattern + 1
}
火花配置:Amazon EMR 5.29.0(Spark 2.4.4和Hadoop 2.8.5)
1个主服务器和10个从属服务器,每个都有96个vCore和768GB RAM(Amazon AWS R5.24xlarge实例)。硬盘为EBS,胸围为3000 IOPS,持续30分钟。
'spark.hadoop.dfs.replication': '3',
'spark.driver.cores':'5',
'spark.driver.memory':'32g',
'spark.executor.instances': '189',
'spark.executor.memory': '32g',
'spark.executor.cores': '5',
'spark.executor.memoryOverhead':'8192',
'spark.driver.memoryOverhead':'8192',
'spark.default.parallelism':'945',
'spark.sql.shuffle.partitions' :'945',
'spark.serializer':'org.apache.spark.serializer.KryoSerializer',
'spark.dynamicAllocation.enabled': 'false',
'spark.memory.fraction':'0.8',
'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version':'2',
'spark.memory.storageFraction':'0.2',
'spark.task.maxFailures': '6',
'spark.driver.extraJavaOptions': '-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError="kill -9 %p"
'spark.executor.extraJavaOptions': '-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError="kill -9 %p"
缩放问题:
从10个实验一直到最多40个从属(相应地调整了火花配置),但结果仍然相同,该工作需要2个多小时才能完成(如第一个图片所示,每个工作都需要一分钟以上, while循环运行99次)。另外,从远程执行器读取的数据几乎不存在(这很好),大多数是本地过程。
分区似乎运行良好(请参阅第二张图片),每个实例有5个RDD块,并且始终运行5个任务(每个实例有5个核心,每个从属节点有19个实例)。 GC也进行了优化。
while循环中编写的每个partitionby任务需要一分钟或更长时间才能完成。
METRICS:
从一项工作总结基本上是按执行一个分区完整工作完成后的几个实例的摘要,因此RDD块为零,第一行是驱动程序。
因此,问题在于如何对其进行更多优化,以及为什么它没有扩大规模?有更好的方法吗?我已经达到了最高性能吗?假设我可以使用更多的硬件资源,还有什么我可以做得更好的?欢迎任何建议。
即使将数据缓存在内存中而不在下游逐出,触摸每条记录100次也是非常低效的。更不用说单独坚持很昂贵了
相反,您可以添加虚拟列
import org.apache.spark.sql.functions.substring
val df = sparksession.read
.parquet("/inputs")
.withColumn("partition_id", substring($"customer_id", -2, 2))
并稍后将其用于分区
df
.write
.partitionBy("partition_id", "customer_id")
.option("header", "true")
.mode("append")
.csv("/")
至avoid to many small files,您可以先使用更长的后缀重新分区
val nParts: Int = ???
val suffixLength: Int = ??? // >= suffix length used for write partitions
df
.repartitionByRange(
nParts,
substring($"customer_id", -suffixLength, suffixLength)
.write
.partitionBy("partition_id", "customer_id")
.option("header", "true")
.mode("append")
.csv("/")
这样的更改将使您可以一次处理所有数据,而无需任何显式缓存。