由于一些不幸的事件序列,我们最终在s3上存储了一个非常零散的数据集。表元数据存储在Glue上,数据用“ bucketBy”写入,并以镶木地板格式存储。
当我们将此数据集加载到Spark / EMR上时,我们最终使每个spark分区在s3上有大约8000个文件。由于我们以列格式存储数据;根据我们需要几个字段的用例,我们并不是真正读取所有数据,而是存储的一小部分。
基于工作节点上的CPU利用率,我可以看到每个任务(每个分区运行)几乎占用了20%的CPU,我怀疑这是由于每个任务有一个线程从s3顺序读取文件,所以很多IOwait ...
是否有一种方法可以鼓励EMR上的spark任务从多线程s3中读取数据,以便我们可以同时从一个任务中的s3中读取多个文件?这样,我们可以利用80%的空闲CPU来加快处理速度?
使用Spark数据帧读取S3数据有两个部分:
发现通常发生在驱动程序上。某些托管Spark环境进行了优化,这些优化使用群集资源来加快发现速度。除非您的对象超过100K,否则通常不会出现问题。如果您具有.option("mergeSchema", true)
,则发现会较慢,因为必须触摸每个文件才能发现其模式。
读取S3文件是执行动作的一部分。读取的并行度为min(分区数,可用内核数)。从理论上讲,更多的分区+更多的可用内核意味着更快的I / O。实际上,如果您没有定期访问这些文件以使S3扩大其可用性,则S3的速度可能会非常慢。因此,在实践中,额外的Spark并行处理的收益递减。观察每个活动核心的总网络RW带宽,并调整执行以获取最高价值。
您可以通过df.rdd.partitions.length
发现分区数。
如果S3 I / O吞吐量低,您还可以做其他事情:
请确保S3上的数据具有前缀(请参阅https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html)是分散的。
打开一个AWS支持请求,并要求将您的数据前缀放大。
具有不同节点类型的实验。我们发现优化存储的节点具有更好的有效I / O。
希望这会有所帮助。