“集装箱通过纱线超出内存限制杀害。 10.4 GB 10.4 GB物理存储器中使用的”一个EMR集群上的内存75GB

问题描述 投票:30回答:5

我运行在AWS EMR 5节点火花集群的每个尺寸m3.xlarge(1个主4个从站)。我经历了146Mb bzip2压缩的CSV文件成功运行,并且结束了一个完美的聚合结果。

现在,我尝试处理集群上的〜5GB bzip2的CSV文件,但我收到此错误:

在舞台6.0(TID XXX,xxx.xxx.xxx.compute.internal)丢失任务49.2:16/11/23 17点29分53秒WARN TaskSetManager ExecutorLostFailure(执行16退出由正在运行的任务之一引起的)原因:集装箱通过纱线超出内存限制杀害。 10.4 GB 10.4 GB物理内存使用。考虑提高spark.yarn.executor.memoryOverhead。

我很困惑,为什么我得到了〜75GB集群(每3m.xlarge例如15GB)在〜10.5GB内存限制...

这里是我的EMR的配置:

[
 {
  "classification":"spark-env",
  "properties":{

  },
  "configurations":[
     {
        "classification":"export",
        "properties":{
           "PYSPARK_PYTHON":"python34"
        },
        "configurations":[

        ]
     }
  ]
},
{
  "classification":"spark",
  "properties":{
     "maximizeResourceAllocation":"true"
  },
  "configurations":[

  ]
 }
]

从我读过,设置maximizeResourceAllocation属性应该告诉EMR配置星火充分利用集群上的所有可用资源。也就是说,我应该有可用内存75GB〜......那么,为什么我得到一个〜10.5GB内存限制错误?下面是我运行的代码:

def sessionize(raw_data, timeout):
# https://www.dataiku.com/learn/guide/code/reshaping_data/sessionization.html
    window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
              .orderBy("timestamp"))
    diff = (pyspark.sql.functions.lag(raw_data.timestamp, 1)
            .over(window))
    time_diff = (raw_data.withColumn("time_diff", raw_data.timestamp - diff)
                 .withColumn("new_session", pyspark.sql.functions.when(pyspark.sql.functions.col("time_diff") >= timeout.seconds, 1).otherwise(0)))
    window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
              .orderBy("timestamp")
              .rowsBetween(-1, 0))
    sessions = (time_diff.withColumn("session_id", pyspark.sql.functions.concat_ws("_", "user_id", "site_id", pyspark.sql.functions.sum("new_session").over(window))))
    return sessions
def aggregate_sessions(sessions):
    median = pyspark.sql.functions.udf(lambda x: statistics.median(x))
    aggregated = sessions.groupBy(pyspark.sql.functions.col("session_id")).agg(
        pyspark.sql.functions.first("site_id").alias("site_id"),
        pyspark.sql.functions.first("user_id").alias("user_id"),
        pyspark.sql.functions.count("id").alias("hits"),
        pyspark.sql.functions.min("timestamp").alias("start"),
        pyspark.sql.functions.max("timestamp").alias("finish"),
        median(pyspark.sql.functions.collect_list("foo")).alias("foo"),
    )
    return aggregated
 spark_context = pyspark.SparkContext(appName="process-raw-data")
spark_session = pyspark.sql.SparkSession(spark_context)
raw_data = spark_session.read.csv(sys.argv[1],
                                  header=True,
                                  inferSchema=True)
# Windowing doesn't seem to play nicely with TimestampTypes.
#
# Should be able to do this within the ``spark.read.csv`` call, I'd
# think. Need to look into it.
convert_to_unix = pyspark.sql.functions.udf(lambda s: arrow.get(s).timestamp)
raw_data = raw_data.withColumn("timestamp",
                               convert_to_unix(pyspark.sql.functions.col("timestamp")))
sessions = sessionize(raw_data, SESSION_TIMEOUT)
aggregated = aggregate_sessions(sessions)
aggregated.foreach(save_session)

基本上,没有什么比窗口和GROUPBY更多的汇总数据。

它开始与一些这些错误,并朝着同样的错误量增大停止。

我试着运行与--conf spark.yarn.executor.memoryOverhead火花提交,但似乎并不要么解决问题。

apache-spark emr amazon-emr bigdata
5个回答
48
投票

我觉得你的痛苦..

我们曾与纱线星火运行内存的类似问题。我们有五个64GB,16个核心虚拟机,也不管我们设置spark.yarn.executor.memoryOverhead到,我们只是不能为这些任务获得足够的内存 - 他们最终不管我们有多少内存给他们死亡。并将此作为是造成这种情况发生相对直接的星火应用。

我们想通了,物理内存使用量是在虚拟机上相当低,但对虚拟内存的使用是非常高(尽管日志抱怨物理内存)。我们在yarn.nodemanager.vmem-check-enabled设置yarn-site.xmlfalse和我们的集装箱已不再杀害了,并且应用程序表现为预期工作。

这样做更多的研究,我找到了答案,为什么这发生在这里:https://www.mapr.com/blog/best-practices-yarn-resource-management

由于在CentOS / RHEL 6有虚拟存储器的分配侵略性由于OS行为,应禁用虚拟存储器检测器或增加yarn.nodemanager.vmem-PMEM比为相对较大的值。

该页面有一个链接从IBM一个非常有用的页面:https://www.ibm.com/developerworks/community/blogs/kevgrig/entry/linux_glibc_2_10_rhel_6_malloc_may_show_excessive_virtual_memory_usage?lang=en

总之,glibc的> 2.10改变了它的内存分配。虽然被分配虚拟内存的巨额不是世界的末日,它不与纱线的默认设置工作。

而不是设置yarn.nodemanager.vmem-check-enabled假的,你也可以在MALLOC_ARENA_MAXhadoop-env.sh环境变量设置为低的数字游戏。

我建议通过这两个页面阅读 - 的信息是非常方便的。


12
投票

如果你不使用spark-submit,你正在寻找另一种方式来指定yarn.nodemanager.vmem-check-enabled提到Duff参数,这里有2种其他方式:

Method 2

如果您使用的是JSON配置文件(您传递到AWS CLI或您boto3脚本),你就必须添加以下配置:

[{
"Classification": "yarn-site", 
  "Properties": {
    "yarn.nodemanager.vmem-check-enabled": "false"
   }
}]

Method 3

如果使用EMR控制台,添加以下配置:

classification=yarn-site,properties=[yarn.nodemanager.vmem-check-enabled=false]

8
投票

看到,

我在我现在工作的一个巨大的集群有同样的问题。该问题不会得到解决,以增加内存的工人。有时候在处理汇集火花将使用更多的内存比它和火花塞的工作将开始使用离堆内存。

一个简单的例子是:

如果您有需要reduceByKey它将数据集,有时,比其他一名工人agregate更多的数据,如果该数据exeeds一个工人的记忆你会得到错误信息。

添加选项spark.yarn.executor.memoryOverhead会帮助你,如果你对用于工人的内存的50%设置(只是为了测试,看看它的工作原理,你可以用更多的测试添加量少)。

但是,你需要了解如何星火与集群中的内存分配工作:

  1. 更常用的方法星火使用机器内存的75%。其余进入SO。
  2. 星火执行期间的内存two types。一部分是执行,另一种是存储。执行用于搅乱,联接聚合和等。存储用于缓存和传播数据进行的跨集群。

关于内存分配的一个好东西,如果你是不是在你执行使用缓存可以设置火花,使用sotorage空间执行工作的一部分,以避免OOM错误。正如你在火花的文档看到这一点:

这种设计保证几个理想的性质。首先,不使用缓存应用程序可以使用整个空间来执行,避免不必要的磁盘溢出。其次,那些使用缓存的应用程序可以保留最小的存储空间(R),其中它们的数据块是免疫的被驱逐。最后,这种方法提供出的现成合理的性能,适用于各种工作负载,而无需对如何存储在内部的划分的用户专业知识。

但是,我们如何使用呢?

您可以更改一些配置,MemoryOverhead配置添加到您的工作电话,但考虑添加这个太:spark.memory.fraction 0.8或0.85更改和spark.memory.storageFraction减少到0.35或0.2。

其他配置可以帮助,但它需要你的情况进行检查。硒所有这些配置here

现在,在我的情况有所帮助。

我有2.5K工人集群和RAM 2.5TB。我们所面临的OOM错误,如你的。我们只是增加了spark.yarn.executor.memoryOverhead到2048,我们启用dynamic allocation。当我们调用的工作,我们没有为工人的记忆,我们留到星火来决定。我们刚才设置的开销。

但对于一些测试我的小集群,改变执行和存储器的大小。这解决了这个问题。


3
投票

尝试重新分区。它工作在我的情况。

当它被装载write.csv()数据帧是不是在一开始就这么大。数据文件总额为10 MB左右,可以根据需要说在执行每个处理任务完全几百MB的内存。我检查分区的数量为2的时候。然后,它像生长在下列操作中与其他表加入一个雪球,添加新列。然后我跑进内存超过限制的问题,在某一步。我检查分区的数量,它仍然是2,从原始数据帧我想的。所以,我想在开始的时候重新分区了,也没有问题了。

我没有看过有关Spark和YARN许多材料呢。我所知道的是,有在节点的执行者。遗嘱执行人可以取决于资源处理很多任务。我的猜测是一个分区将被原子映射到一个任务。它的体积决定了资源的使用。如果一个分区增长得太大星火不能切它。

合理的策略是首先确定节点和容器存储器,无论是10GB或5GB。理想的情况是,两个公司可以服务所有的数据处理工作,只是一个时间问题。考虑到5GB内存设置,合理的排为一个分区,你发现,说的是1000的测试(不会失败,在处理过程中的任何步骤)之后,我们可以做到这一点为以下伪代码:

RWS_PER_PARTITION = 1000
input_df = spark.write.csv("file_uri", *other_args)
total_rows = input_df.count()
original_num_partitions = input_df.getNumPartitions()
numPartitions = max(total_rows/RWS_PER_PARTITION, original_num_partitions)
input_df = input_df.repartition(numPartitions)

希望能帮助到你!


0
投票

我有小群对同一问题上的火花2.3.1运行相对较小的工作。该作业读取拼花文件,删除使用GROUPBY / AGG /第一然后重复排序并写入新的实木复合地板。它处理的51 GB的4个节点的镶木文件(4个vcores,32GB RAM)。

作业不断失败的聚集阶段。我写bash脚本手表执行人的内存使用情况,结果发现,在第一阶段随机执行的中间开始采取双存储几秒钟。当我与GC日志相关的这一刻的时候将其与清空内存大量的满GC匹配。

我终于明白,问题是主题相关的GC。 ParallelGC和G1导致此问题不断,但ConcMarkSweepGC提高的情况。这个问题似乎只与分区的量小。我跑到哪里OpenJDK 64-Bit (build 25.171-b10)上安装了电子病历工作。我不知道这个问题的根本原因,这可能与JVM或操作系统。但它肯定是不相关的我的情况下堆或离堆的使用情况。

UPDATE1

试图甲骨文热点问题再现。

© www.soinside.com 2019 - 2024. All rights reserved.