来自EMR / Spark的S3写入速度非常慢

问题描述 投票:21回答:6

我正在写,看看是否有人知道如何加速从EMR中运行的Spark的S3写入时间?

我的Spark Job需要4个多小时才能完成,但群集在前1.5个小时内仅处于负载状态。

enter image description here

我很好奇Spark一直在做什么。我查看了日志,发现了许多s3 mv命令,每个文件一个。然后直接看看S3我看到我的所有文件都在_temporary目录中。

其次,我关注我的集群成本,看来我需要为这项特定任务购买2小时的计算。但是,我最终买了5个小时。我很好奇EMR AutoScaling在这种情况下是否有助于降低成本。

一些文章讨论了更改文件输出提交器算法,但我没有成功。

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

写入本地HDFS很快。我很好奇,如果发出一个hadoop命令将数据复制到S3会更快吗?

enter image description here

amazon-web-services apache-spark amazon-s3 amazon-emr
6个回答
11
投票

你看到的是outputcommitter和s3的问题。提交作业在_temporary文件夹上应用fs.rename,并且由于S3不支持重命名,这意味着单个请求现在正在复制并删除从_temporary到其最终目标的所有文件。

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")仅适用于hadoop版本> 2.7。它的作用是从提交任务中的_temporary复制每个文件而不是提交作业,因此它是分布式的并且工作得非常快。

如果您使用旧版本的hadoop,我会使用Spark 1.6并使用:

sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

*请注意,它无法启用推测或以附加模式写入

**还要注意它在Spark 2.0中已弃用(由algorithm.version = 2取代)

BTW在我的团队中我们实际上使用Spark写入HDFS并在生产中使用DISTCP作业(特别是s3-dist-cp)将文件复制到S3但这是由于其他几个原因(一致性,容错性)完成的,因此没有必要..你可以使用我的建议快速写入S3。


5
投票

直接提交者从火花中被拉出,因为它不能承受故障。我强烈反对使用它。

Hadoop,s3guard正在进行工作,以添加0重命名提交者,这将是O(1)和容错;密切关注HADOOP-13786

暂时忽略“魔术提交者”,基于Netflix的登台提交者将首先发布(hadoop 2.9?3.0?)

  1. 这会在任务提交中将工作写入本地FS
  2. 发出未提交的multipart put操作来写入数据,但没有实现它。
  3. 使用原始的“算法1”文件输出提交器保存将PUT提交到HDFS所需的信息
  4. 实现一个作业提交,它使用HDFS的文件输出提交来决定要完成哪些PUT,以及取消哪些PUT。

结果:任务提交占用数据/带宽秒,但作业提交不会超过目标文件夹上1-4 GET的时间和每个挂起文件的POST,后者被并行化。

您可以选择这项工作所基于的提交者,from netflix,并且可能今天在火花中使用它。设置文件commit algorithm = 1(应该是默认值)或者它实际上不会写入数据。


4
投票

我有类似的用例,我使用spark写入s3并且有性能问题。主要原因是spark创建了大量零字节部分文件,将临时文件替换为实际文件名正在减慢写入过程。尝试下面的方法作为解决方法

  1. 将spark的输出写入HDFS并使用Hive写入s3。由于hive创建的零件文件数量较少,因此性能要好得多。我遇到的问题是(使用spark时也有同样的问题),由于安全原因,prod env中没有提供对策略的删除操作。在我的情况下,S3桶是kms加密的。
  2. 将火花输出写入HDFS并将复制的hdfs文件写入本地并使用aws s3复制将数据推送到s3。用这种方法获得了第二好的结果。亚马逊创建了门票,他们建议使用这个。
  3. 使用s3 dist cp将文件从HDFS复制到S3。这没有问题,但没有表现

1
投票

你在火花输出中看到了什么?如果你看到很多重命名操作,请阅读this


0
投票

我们在Azure上使用Spark在WASB上体验过相同的内容。我们最终决定不直接使用带有spark的distrbitued存储。我们对一个真正的hdfs://目的地进行了spark.write并开发了一个特定的工具:hadoop copyFromLocal hdfs:// wasb://然后HDFS在归档到WASB(或S3)之前是我们的临时缓冲区。


0
投票

你写的文件有多大?将一个核心写入一个非常大的文件将比分割文件慢得多,并让多个工作人员写出较小的文件。

© www.soinside.com 2019 - 2024. All rights reserved.