Spark Graphframes大型数据集和内存问题

问题描述 投票:1回答:1

我想在相对大的图上运行Pagerank 35亿个节点900亿个边缘。而且我一直在尝试使用不同的群集大小以使其运行。但首先是代码:

from pyspark.sql import SparkSession
import graphframes

spark = SparkSession.builder.getOrCreate()

edges_DF = spark.read.parquet('s3://path/to/edges') # 1.4TB total size
verts_DF   = spark.read.parquet('s3://path/to/verts') # 25GB total size

graph_GDF = graphframes.GraphFrame(verts_DF, edges_DF)
graph_GDF = graph_GDF.dropIsolatedVertices()

result_df   = graph_GDF.pageRank(resetProbability=0.15, tol=0.1)
pagerank_df = result_df.vertices
pagerank_df.write.parquet('s3://path/to/output', mode='overwrite')

我从一开始就经历了垃圾回收率很高的问题。因此,我为群集尝试了不同的设置和大小。我主要关注两篇文章:

https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html

https://aws.amazon.com/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/

我在亚马逊EMR上运行集群。这些是我当前使用的相关设置:

"spark.jars.packages": "org.apache.hadoop:hadoop-aws:2.7.6,graphframes:graphframes:0.7.0-spark2.4-s_2.11",
"spark.dynamicAllocation.enabled": "false",
"spark.network.timeout":"1600s",
"spark.executor.heartbeatInterval":"120s",
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.storage.level": "MEMORY_AND_DISK_SER",
"spark.rdd.compress": "true",
"spark.shuffle.compress": "true",
"spark.shuffle.spill.compress": "true",
"spark.memory.fraction": "0.80",
"spark.memory.storageFraction": "0.30",
"spark.serializer":"org.apache.spark.serializer.KryoSerializer",
"spark.sql.shuffle.partitions":"1216"

"yarn.nodemanager.vmem-check-enabled": "false",
"yarn.nodemanager.pmem-check-enabled": "false"

"maximizeResourceAllocation": "true"

"fs.s3.maxConnections": "5000",
"fs.s3.consistent": "true",
"fs.s3.consistent.throwExceptionOnInconsistency":"false",
"fs.s3.consistent.retryPolicyType":"fixed",
"fs.s3.consistent.retryPeriodSeconds":"10"

我尝试了集群大小,但第一个似乎可行的实验是具有以下参数的集群:--deploy-mode cluster --num-executors 75 --executor-cores 5 --executor-memory 36g --driver-memory 36g --driver-cores 5

[使用此配置GC,时间已经很短了,但是一切正常,但是由于这是一个测试集群,因此它的总内存只有2.7 TB很小,“我也想了一段时间ExecutorLostFailure (executor 54 exited caused by one of the running tasks) Reason: Container from a bad node Exit status: 137.因为我将node留给了很少的RAM。因此,我重新运行了整个过程,但是这次用--executor-cores 5 --executor-memory 35g重新运行了我的GC问题,后面和我的集群表现得很奇怪。因此,我以为我理解了一个问题,即高GC次数的原因并不是每个执行程序的内存不足。

我启动的下一个群集具有以下参数:--deploy-mode cluster --num-executors 179 --executor-cores 5 --executor-memory 45g --driver-memory 45g --driver-cores 5

因此,像以前一样,更大的群集和每个执行者更大的内存。一切运行顺利,我通过ganglia注意到第一步大约要用到RAM 5.5 TB

我虽然了解一些问题,即使用更少的内核可用于我的集群并扩大每个执行器的内存可以使程序运行更快,但我猜想它与verts_DF的大小大约为25gb无关,这样就可以适应存入每个执行器的内存中,并保留计算空间(25GB * 179几乎为5.5TB)。

因此,我启动的下一个集群具有相同数量的节点,但是我将执行程序的大小调整为:--num-executors 119 --executor-cores 5 --executor-memory 75g

立即将所有问题退回哪里!通过GC将集群挂起的ganglia次高,我可以看到RAM最多填满了9个可用TB中的8个。我感到困惑。我回过头来再次旋转了--num-executors 179 --executor-cores 5 --executor-memory 45g群集,幸运的是,使用EMR可以轻松完成,因为我可以克隆它。但是现在这种配置也不起作用。高GC倍群集立即达到已用内存的8TB

这里发生了什么?感觉就像我玩轮盘赌,有时相同的配置有效,而其他时候却没有?

apache-spark pyspark amazon-emr graphframes
1个回答
0
投票

[如果一段时间后仍然有人迷失了方向,它就会意识到问题出在graphxgraphframes如何加载图形上。两者都试图生成它们正在加载的图的所有三元组,它们具有非常大的图分辨率OOM,因为具有35亿个节点和700亿条边的图已经毁了其中的许多。我通过在pyspark中实现pagerank编写了一个解决方案。可以肯定它的速度不及scala实现,但是它可以扩展并且不会遇到所描述的三元组问题。我在github上发布了它https://github.com/thagorx/spark_pagerank

© www.soinside.com 2019 - 2024. All rights reserved.