数据文件(CSV)描述:
我运行以下代码:
###imports
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window as w
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
##start session without off-heap
spark = SparkSession.builder.\
config("spark.dynamicAllocation.enabled",False).\
config("spark.executor.cores","2").\
config("spark.executor.instances","1").\
config("spark.executor.memory","500m").\
config("spark.sql.adaptive.enabled", False).\
config("spark.sql.files.maxPartitionBytes","13g").\
getOrCreate()
####read in a 1.9 GiB Serialized CSV
df_covid = spark.read.csv("gs://xxxxxxxxxxxx.appspot.com/spark_datasets/covid13g.csv",
header=True, inferSchema=False)
df_covid.cache()
######check the number of partitions
df_covid.rdd.getNumPartitions() #output 2
###To ensure filters don't get pushed down
df_covid = df_covid.withColumn("_catted", F.concat_ws('',*df_covid.columns))
######Apply filter
df_covid = df_covid.filter(F.length(df_covid._catted) > 20)
######count post filter rows
df_covid.count()
作业成功完成,以下是 Spark Web UI 中的执行程序选项卡的快照
我不明白两件事:
对于第一季度,如果您深入了解 Spark 内存管理概述:
Spark 中的内存使用主要属于两类之一:执行和存储。执行内存是指用于洗牌、连接、排序和聚合中的计算的内存,而存储内存是指用于跨集群缓存和传播内部数据的内存。并非所有操作都会使用 Spark 中的执行内存。它通常用于短期对象和数据混洗等计算。因此,即使您使用连接和过滤,也不会使用执行内存。
1. Read a CSV
2. Repartition to bring together the keys on the same executor
3. Add a transformation column (let us say the percentile within each key by which you repartitioned)
4. (optional) - cache this data
5. save the CSV to the disk
使用的记忆: