EMR 中基于 AWS Graviton 的 EC2 实例升级导致任务失败

问题描述 投票:0回答:1

我有一个在 EMR 中运行的 spark Scala 作业,我正试图改进它。截至目前,它在 m5.8xlarge 上运行没有任何问题。我最近尝试升级到基于 Graviton 的 EC2 实例 m6g.8xlarge,虽然工作成功了,但我看到了一些奇怪的问题。我看到的一些问题是任务因超时而失败,阶段以奇怪的顺序运行,并且看起来内存紧张。乱序运行的阶段是任务失败的阶段,第 6 阶段运行失败,然后第 4 和 5 阶段完成,然后第 6 阶段重试成功。在当前运行的 m5.8xlarge 运行中,跳过了第 4 和第 5 阶段。我不确定为什么会发生这种情况,因为我所做的唯一更改是从 m5 实例类型变为 m6g,所以我想看看是否有人遇到过类似的事情或有解决方案。我还会发布一些失败任务的错误,但我认为它们与 oom 有关。

这是我看到的主要错误:

ERROR TransportClientFactory:261 - Exception while bootstrapping client after 60041 ms
java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout waiting for task.
    at org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
    at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:263)
    at org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:70)
    at org.apache.spark.network.crypto.AuthClientBootstrap.doSaslAuth(AuthClientBootstrap.java:116)
    at org.apache.spark.network.crypto.AuthClientBootstrap.doBootstrap(AuthClientBootstrap.java:89)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:257)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
    at org.apache.spark.network.shuffle.ExternalShuffleClient.lambda$fetchBlocks$0(ExternalShuffleClient.java:100)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:121)
    at org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:109)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:264)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.org$apache$spark$storage$ShuffleBlockFetcherIterator$$send$1(ShuffleBlockFetcherIterator.scala:614)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:609)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:442)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:160)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:66)
    at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:173)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.TimeoutException: Timeout waiting for task.
    at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:276)
    at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:96)
    at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:259)
    ... 39 more
apache-spark amazon-ec2 amazon-emr aws-graviton
1个回答
1
投票

我认为这不是内存不足的问题。 m6g.8xlarge 和 m5.8xlarge 都有 120 GB 的内存,符合它们的规格: https://aws.amazon.com/ec2/instance-types/m6ghttps://aws.amazon.com/ec2/instance-types/m5

我在回溯中看到超时是在身份验证过程中:

首先它无法在 doBootstrap(AuthClientBootstrap.java:89) 中使用 Spark 的 auth 协议进行身份验证 https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java#L99

通过使用 Spark 的身份验证协议执行身份验证来引导 {@link TransportClient}。 如果服务器在身份验证期间抛出错误并且配置允许,则此引导程序回退到使用 SASL 引导程序。这用于向后兼容不支持新协议的外部洗牌服务。

然后它也无法在 doBootstrap(SaslClientBootstrap.java:70 https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java#L54

  • 通过对连接执行 SASL 身份验证来引导 {@link TransportClient}。服务器应该使用 {@link SaslRpcHandler} 设置,并为给定的 appId 匹配密钥。
  • 通过发送令牌执行 SASL 身份验证,然后继续处理 SASL 质询-响应令牌,直到我们成功验证或因不匹配而抛出异常。
© www.soinside.com 2019 - 2024. All rights reserved.