如何在Spark中识别最佳混洗分区数量

问题描述 投票:0回答:1

我正在EMR中运行一个火花结构化的流作业(每天都会反弹)。执行几个小时后,我的应用程序中出现OOM错误并被杀死。以下是我的配置和Spark SQL代码。我是Spark的新手,需要您的宝贵意见。

EMR具有10个实例,具有16个核心和64GB内存。

火花提交参数:

   num_of_executors: 17
   executor_cores: 5
   executor_memory: 19G
   driver_memory: 30G

作业正在以30秒钟的间隔从Kafka读取微批输入。每批读取的平均行数为90k。

  spark.streaming.kafka.maxRatePerPartition: 4500
  spark.streaming.stopGracefullyOnShutdown: true
  spark.streaming.unpersist: true
  spark.streaming.kafka.consumer.cache.enabled: true
  spark.hadoop.fs.s3.maxRetries: 30 
  spark.sql.shuffle.partitions: 2001

Spark SQL聚合代码:

dataset.groupBy(functions.col(NAME),functions.window(functions.column(TIMESTAMP_COLUMN),30))
            .agg(functions.concat_ws(SPLIT, functions.collect_list(DEPARTMENT)).as(DEPS))
            .select(NAME,DEPS)
            .map((row) -> {
              Map<String, Object> map = Maps.newHashMap();
              map.put(NAME, row.getString(0));
              map.put(DEPS, row.getString(1));
              return new KryoMapSerializationService().serialize(map);
            }, Encoders.BINARY());

来自驱动程序的一些日志:

20/04/04 13:10:51 INFO TaskSetManager: Finished task 1911.0 in stage 1041.0 (TID 1052055) in 374 ms on <host> (executor 3) (1998/2001)
20/04/04 13:10:52 INFO TaskSetManager: Finished task 1925.0 in stage 1041.0 (TID 1052056) in 411 ms on  <host> (executor 3) (1999/2001)
20/04/04 13:10:52 INFO TaskSetManager: Finished task 1906.0 in stage 1041.0 (TID 1052054) in 776 ms on  <host> (executor 3) (2000/2001)
20/04/04 13:11:04 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 3.
20/04/04 13:11:04 INFO DAGScheduler: Executor lost: 3 (epoch 522)
20/04/04 13:11:04 INFO BlockManagerMasterEndpoint: Trying to remove executor 3 from BlockManagerMaster.
20/04/04 13:11:04 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(3,  <host>, 38533, None)
20/04/04 13:11:04 INFO BlockManagerMaster: Removed 3 successfully in removeExecutor
20/04/04 13:11:04 INFO YarnAllocator: Completed container container_1582797414408_1814_01_000004 on host:  <host> (state: COMPLETE, exit status: 143)

顺便说一下,我在我的forEachBatch代码中使用collectasList

  List<Event> list = dataset.select("value")
        .selectExpr("deserialize(value) as rows")
        .select("rows.*")
        .selectExpr(NAME, DEPS)
        .as(Encoders.bean(Event.class))
        .collectAsList();
java apache-spark apache-spark-sql yarn spark-structured-streaming
1个回答
0
投票

使用这些设置,可能会导致您自己的问题。

   num_of_executors: 17
   executor_cores: 5
   executor_memory: 19G
   driver_memory: 30G

您基本上是在这里创建多余的容器,因此必须在它们之间进行洗牌。相反,应该从10个执行程序,15个内核,60g内存开始。如果这有效,那么您可以尝试一下这些以尝试并优化性能。我通常尝试在每个步骤中将容器分成两半(但是从spark 2.0开始,我也不需要这样做)。

让Spark SQL的默认值保持为200。对它的分解越多,Spark进行计算随机数的数学就越多。如果有的话,我会尝试使用与执行程序相同的并行度,所以在这种情况下,只有10个。当2.0发布时,这就是调整配置单元查询的方式。使工作变得复杂,难以分手,这将使所有工作负担都增加了。

使用数据集和编码通常也不如直接使用DataFrame操作那样好。我发现将其考虑到数据帧操作的性能有了很大提升。

© www.soinside.com 2019 - 2024. All rights reserved.