我是 Spark 的初学者。
我在本地机器上运行 Spark,没有集群。
我开发了以下脚本。
目的是获得密钥“许可证”的唯一值。
一切都顺利超越界限
licenses_collected = rdd.map(lambda x: x["license"]).collect()
unique_licenses = rdd.map(lambda x: x["license"]).distinct()
但是当我尝试将
.distinct()
方法应用于映射的 rdd,然后应用 .collect()
方法时,会引发 Traceback。
这是引发错误的行
unique_licenses_collected = rdd.map(lambda x: x["license"]).distinct().collect()
有什么问题吗?
"""Question 0: Start Connection With Spark"""
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").\
appName("prima applicazione spark").\
config("spark.ui.port", "4040").\
getOrCreate()
"""Question 1: Load Dataset"""
df_spark = spark.read.json(path_json)
rdd = df_spark.rdd
rdd_collected = rdd.collect()
"""Question 2: Get the first two records"""
for line in rdd.take(2):
print(line)
print("\n\n")
"""Question 3: Get the name of the licenses"""
licenses_collected = rdd.map(lambda x: x["license"]).collect()
unique_licenses = rdd.map(lambda x: x["license"]).distinct()
unique_licenses_collected = rdd.map(lambda x: x["license"]).distinct().collect()
AttributeError:无法在
上获取属性“PySparkRuntimeError”
pyspark --version
version 3.4.2
python --version
Python 3.10.12
问题是由于驱动程序机器上内存不足引起的(本地主机,因为我没有集群)。
为了解决这个问题,我通过替换行将驱动器机器的内存从默认值1GB增加到2GB
spark = SparkSession.builder.master("local").\
appName("prima applicazione spark").\
config("spark.ui.port", "4040").\
getOrCreate()
有线条
spark = SparkSession.builder.master("local").\
appName("prima applicazione spark").\
config("spark.driver.memory", "2g").\
config("spark.ui.port", "4040").\
getOrCreate()