假设我们有 5 台工作机器。每个工作节点都有
16 cores
和64 GB memory
的配置。所以总计 80 core
和 320 GB memory
。
现在我们可以配置如下属性:
选项1:设置大型执行器
选项2:为每个执行器设置5个核心
问题 - 对于执行器需要高内存的情况(例如:在具有倾斜数据的列上进行分组),无论上面选择的方法如何,执行器中可用的内核都会共享执行器的内存(即在选项 1 中, 15个核心共享63GB内存,如果某个特定核心或任务需要的内存超过63GB/15),我们在Spark中如何处理这种情况?
选项1(大型执行器)意味着更简单的集群管理,只要您的任务可以有效地使用大型内存池,就可以提高数据局部性。 可以并行运行的任务更少(假设您的 Spark 作业读取 100k parquet 文件 - 一次只能完成 4-5 个任务;另一方面,如果您有 10 个大 parquet 文件要读取,这可能是一个更好的选择)
选项2(较小的执行器)具有更好的容错能力(重新启动失败的任务)并且可以更有效地使用资源。
动态分配允许Spark根据工作负载动态调整执行器的数量。您可以启用它并设置执行器的最小和最大数量:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Dynamic Allocation Example") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.initialExecutors", 2) \
.config("spark.dynamicAllocation.minExecutors", 1) \
.config("spark.dynamicAllocation.maxExecutors", 10) \
.config("spark.shuffle.service.enabled", "true") \
.getOrCreate()
增加此值对于需要大量内存来进行 Java 堆之外的操作的任务来说是有益的:
spark = SparkSession.builder \
.appName("Memory Overhead Example") \
.config("spark.executor.memory", "4g") \
.config("spark.executor.memoryOverhead", "1024") \
.getOrCreate()
自定义分区程序可以帮助您在分区之间更均匀地分配数据。虽然 PySpark 不允许像 Scala/Java 那样直接创建自定义分区器,但您可以通过操作数据来实现类似的效果。
partitioned_df = df.repartition("key")
广播变量用于在所有节点上保存数据的副本。当连接一个大的 DataFrame 和一个小的 DataFrame 时,这会很有用:
broadcast_df = spark.sparkContext.broadcast(small_df.collect())
现在,您可以在操作中访问“broadcast_df.value”来使用广播数据。
加盐涉及向密钥添加随机前缀以更均匀地分布数据。以下是如何在连接操作中实现加盐的示例:
large_df = large_df.withColumn("salted_key", F.concat(large_df["key"], F.lit("_"), (F.rand()*10).cast("int")))
salted_small_df = small_df.crossJoin(spark.range(0, 10).withColumnRenamed("id", "salt")).withColumn("salted_key", F.concat(small_df["key"], F.lit("_"), F.col("salt")))
joined_df = large_df.join(salted_small_df, "salted_key")