执行器内的内核共享内存

问题描述 投票:0回答:1

假设我们有 5 台工作机器。每个工作节点都有

16 cores
64 GB memory
的配置。所以总计
80 core
320 GB memory

现在我们可以配置如下属性:

  • 选项1:设置大型执行器

    • 每 1 台机器 1 个执行器,即。每个执行器有 63 GB 内存和 15 个核心(除去操作系统/开销等)
  • 选项2:为每个执行器设置5个核心

    • 每个执行器的核心数量:5
    • 每台机器的执行器数量:15核/5 = 3
    • 每个执行器的内存:63 GB / 3 个执行器 = 21 GB
    • 选项 1 与选项 2 的优缺点是什么?

问题 - 对于执行器需要高内存的情况(例如:在具有倾斜数据的列上进行分组),无论上面选择的方法如何,执行器中可用的内核都会共享执行器的内存(即在选项 1 中, 15个核心共享63GB内存,如果某个特定核心或任务需要的内存超过63GB/15),我们在Spark中如何处理这种情况?

scala apache-spark pyspark
1个回答
0
投票

选项1(大型执行器)意味着更简单的集群管理,只要您的任务可以有效地使用大型内存池,就可以提高数据局部性。 可以并行运行的任务更少(假设您的 Spark 作业读取 100k parquet 文件 - 一次只能完成 4-5 个任务;另一方面,如果您有 10 个大 parquet 文件要读取,这可能是一个更好的选择)

选项2(较小的执行器)具有更好的容错能力(重新启动失败的任务)并且可以更有效地使用资源。

动态分配允许Spark根据工作负载动态调整执行器的数量。您可以启用它并设置执行器的最小和最大数量:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Dynamic Allocation Example") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.initialExecutors", 2) \
    .config("spark.dynamicAllocation.minExecutors", 1) \
    .config("spark.dynamicAllocation.maxExecutors", 10) \
    .config("spark.shuffle.service.enabled", "true") \
    .getOrCreate()

增加此值对于需要大量内存来进行 Java 堆之外的操作的任务来说是有益的:

spark = SparkSession.builder \
    .appName("Memory Overhead Example") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.memoryOverhead", "1024") \
    .getOrCreate()

自定义分区程序可以帮助您在分区之间更均匀地分配数据。虽然 PySpark 不允许像 Scala/Java 那样直接创建自定义分区器,但您可以通过操作数据来实现类似的效果。

partitioned_df = df.repartition("key")

广播变量用于在所有节点上保存数据的副本。当连接一个大的 DataFrame 和一个小的 DataFrame 时,这会很有用:

broadcast_df = spark.sparkContext.broadcast(small_df.collect())

现在,您可以在操作中访问“broadcast_df.value”来使用广播数据。

加盐涉及向密钥添加随机前缀以更均匀地分布数据。以下是如何在连接操作中实现加盐的示例:

large_df = large_df.withColumn("salted_key", F.concat(large_df["key"], F.lit("_"), (F.rand()*10).cast("int")))

salted_small_df = small_df.crossJoin(spark.range(0, 10).withColumnRenamed("id", "salt")).withColumn("salted_key", F.concat(small_df["key"], F.lit("_"), F.col("salt")))

joined_df = large_df.join(salted_small_df, "salted_key")
© www.soinside.com 2019 - 2024. All rights reserved.