当输入数据的大小(大约6 GB)很小时,以下代码在Mac OS(Python 3.7)上的PySpark 2.4的独立版本上运行得很好。但是,当我在HDInsight群集(HDI 4.0,即Python 3.5,PySpark 2.4、4个工作程序节点,分别具有64个内核和432 GB RAM,2个标头节点,每个都有4个内核和28 GB RAM,2个头节点)上运行代码时,生成具有较大输入数据(169 GB)的数据湖,最后一步,就是将数据写入数据湖,花了我很多时间(执行24小时后我杀死了它)。鉴于HDInsight在云计算社区中并不流行,我只能引用抱怨将数据帧写入S3时速度低的帖子。一些人建议对数据集进行重新分区,我做了,但没有帮助。
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StringType, IntegerType, BooleanType
from pyspark.sql.functions import udf, regexp_extract, collect_set, array_remove, col, size, asc, desc
from pyspark.ml.fpm import FPGrowth
import os
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.5"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3.5"
def work(order_path, beer_path, corpus_path, output_path, FREQ_THRESHOLD=1000, LIFT_THRESHOLD=1):
print("Creating Spark Environment...")
spark = SparkSession.builder.appName("Menu").getOrCreate()
print("Spark Environment Created!")
print("Working on Checkpoint1...")
orders = spark.read.csv(order_path)
orders.createOrReplaceTempView("orders")
orders = spark.sql(
"SELECT _c14 AS order_id, _c31 AS in_menu_id, _c32 AS in_menu_name FROM orders"
)
orders.createOrReplaceTempView("orders")
beer = spark.read.csv(
beer_path,
header=True
)
beer.createOrReplaceTempView("beer")
beer = spark.sql(
"""
SELECT
order_id AS beer_order_id,
in_menu_id AS beer_in_menu_id,
'-999' AS beer_in_menu_name
FROM beer
"""
)
beer.createOrReplaceTempView("beer")
orders = spark.sql(
"""
WITH orders_beer AS (
SELECT *
FROM orders
LEFT JOIN beer
ON orders.in_menu_id = beer.beer_in_menu_id
)
SELECT
order_id,
in_menu_id,
CASE
WHEN beer_in_menu_name IS NOT NULL THEN beer_in_menu_name
WHEN beer_in_menu_name IS NULL THEN in_menu_name
END AS menu_name
FROM orders_beer
"""
)
print("Checkpoint1 Completed!")
print("Working on Checkpoint2...")
corpus = spark.read.csv(
corpus_path,
header=True
)
keywords = corpus.select("Food_Name").rdd.flatMap(lambda x: x).collect()
orders = orders.withColumn(
"keyword",
regexp_extract(
"menu_name",
"(?=^|\s)(" + "|".join(keywords) + ")(?=\s|$)",
0
)
)
orders.createOrReplaceTempView("orders")
orders = spark.sql("""
SELECT order_id, in_menu_id, keyword
FROM orders
WHERE keyword != ''
""")
orders.createOrReplaceTempView("orders")
orders = orders.groupBy("order_id").agg(
collect_set("keyword").alias("items")
)
print("Checkpoint2 Completed!")
print("Working on Checkpoint3...")
fpGrowth = FPGrowth(
itemsCol="items",
minSupport=0,
minConfidence=0
)
model = fpGrowth.fit(orders)
print("Checkpoint3 Completed!")
print("Working on Checkpoint4...")
frequency = model.freqItemsets
frequency = frequency.filter(col("freq") > FREQ_THRESHOLD)
frequency = frequency.withColumn(
"items",
array_remove("items", "-999")
)
frequency = frequency.filter(size(col("items")) > 0)
frequency = frequency.orderBy(asc("items"), desc("freq"))
frequency = frequency.dropDuplicates(["items"])
frequency = frequency.withColumn(
"antecedent",
udf(
lambda x: "|".join(sorted(x)), StringType()
)(frequency.items)
)
frequency.createOrReplaceTempView("frequency")
lift = model.associationRules
lift = lift.drop("confidence")
lift = lift.filter(col("lift") > LIFT_THRESHOLD)
lift = lift.filter(
udf(
lambda x: x == ["-999"], BooleanType()
)(lift.consequent)
)
lift = lift.drop("consequent")
lift = lift.withColumn(
"antecedent",
udf(
lambda x: "|".join(sorted(x)), StringType()
)(lift.antecedent)
)
lift.createOrReplaceTempView("lift")
result = spark.sql(
"""
SELECT lift.antecedent, freq AS frequency, lift
FROM lift
INNER JOIN frequency
ON lift.antecedent = frequency.antecedent
"""
)
print("Checkpoint4 Completed!")
print("Writing Result to Data Lake...")
result.repartition(1024).write.mode("overwrite").parquet(output_path)
print("All Done!")
def main():
work(
order_path=169.1 GB of txt,
beer_path=4.9 GB of csv,
corpus_path=210 KB of csv,
output_path="final_result.parquet"
)
if __name__ == "__main__":
main()
我首先认为这是由文件格式parquet引起的。但是,当我尝试使用csv时,遇到了同样的问题。我试过result.count()
来查看表result
有多少行。花费了很长时间才获得行号,就像将数据写入数据湖一样。如果大型数据集与小型数据集连接,建议使用broadcast hash join而不是默认的[[sort-merge join。我认为值得尝试,因为在初步研究中较小的样本告诉我frequency
的行数大约是lift
的行数的0.09%(如果您难以跟踪frequency
和lift
,请参见下面的查询)。
SELECT lift.antecedent, freq AS frequency, lift
FROM lift
INNER JOIN frequency
ON lift.antecedent = frequency.antecedent
考虑到这一点,我修改了代码:
from pyspark.sql import SparkSession from pyspark.sql.types import ArrayType, StringType, IntegerType, BooleanType from pyspark.sql.functions import udf, regexp_extract, collect_set, array_remove, col, size, asc, desc from pyspark.ml.fpm import FPGrowth import os os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.5" os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3.5" def work(order_path, beer_path, corpus_path, output_path, FREQ_THRESHOLD=1000, LIFT_THRESHOLD=1): print("Creating Spark Environment...") spark = SparkSession.builder.appName("Menu").getOrCreate() print("Spark Environment Created!") print("Working on Checkpoint1...") orders = spark.read.csv(order_path) orders.createOrReplaceTempView("orders") orders = spark.sql( "SELECT _c14 AS order_id, _c31 AS in_menu_id, _c32 AS in_menu_name FROM orders" ) orders.createOrReplaceTempView("orders") beer = spark.read.csv( beer_path, header=True ) beer.createOrReplaceTempView("beer") beer = spark.sql( """ SELECT order_id AS beer_order_id, in_menu_id AS beer_in_menu_id, '-999' AS beer_in_menu_name FROM beer """ ) beer.createOrReplaceTempView("beer") orders = spark.sql( """ WITH orders_beer AS ( SELECT * FROM orders LEFT JOIN beer ON orders.in_menu_id = beer.beer_in_menu_id ) SELECT order_id, in_menu_id, CASE WHEN beer_in_menu_name IS NOT NULL THEN beer_in_menu_name WHEN beer_in_menu_name IS NULL THEN in_menu_name END AS menu_name FROM orders_beer """ ) print("Checkpoint1 Completed!") print("Working on Checkpoint2...") corpus = spark.read.csv( corpus_path, header=True ) keywords = corpus.select("Food_Name").rdd.flatMap(lambda x: x).collect() orders = orders.withColumn( "keyword", regexp_extract( "menu_name", "(?=^|\s)(" + "|".join(keywords) + ")(?=\s|$)", 0 ) ) orders.createOrReplaceTempView("orders") orders = spark.sql(""" SELECT order_id, in_menu_id, keyword FROM orders WHERE keyword != '' """) orders.createOrReplaceTempView("orders") orders = orders.groupBy("order_id").agg( collect_set("keyword").alias("items") ) print("Checkpoint2 Completed!") print("Working on Checkpoint3...") fpGrowth = FPGrowth( itemsCol="items", minSupport=0, minConfidence=0 ) model = fpGrowth.fit(orders) print("Checkpoint3 Completed!") print("Working on Checkpoint4...") frequency = model.freqItemsets frequency = frequency.filter(col("freq") > FREQ_THRESHOLD) frequency = frequency.withColumn( "antecedent", array_remove("items", "-999") ) frequency = frequency.drop("items") frequency = frequency.filter(size(col("antecedent")) > 0) frequency = frequency.orderBy(asc("antecedent"), desc("freq")) frequency = frequency.dropDuplicates(["antecedent"]) frequency = frequency.withColumn( "antecedent", udf( lambda x: "|".join(sorted(x)), StringType() )(frequency.antecedent) ) lift = model.associationRules lift = lift.drop("confidence") lift = lift.filter(col("lift") > LIFT_THRESHOLD) lift = lift.filter( udf( lambda x: x == ["-999"], BooleanType() )(lift.consequent) ) lift = lift.drop("consequent") lift = lift.withColumn( "antecedent", udf( lambda x: "|".join(sorted(x)), StringType() )(lift.antecedent) ) result = lift.join( frequency.hint("broadcast"), ["antecedent"], "inner" ) print("Checkpoint4 Completed!") print("Writing Result to Data Lake...") result.repartition(1024).write.mode("overwrite").parquet(output_path) print("All Done!") def main(): work( order_path=169.1 GB of txt, beer_path=4.9 GB of csv, corpus_path=210 KB of csv, output_path="final_result.parquet" ) if __name__ == "__main__": main()
该代码在我的Mac OS上使用相同的示例数据运行得很好,并且所期望的时间更少(34秒对26秒)。然后,我决定将代码与完整的数据集一起运行到HDInsight。在将数据写入数据湖的最后一步中,任务失败,并被告知由于SparkContext已关闭,作业被取消。我对大数据比较陌生,不了解这种方法。互联网上的帖子说,背后可能有很多原因。无论我应该使用哪种方法,如何优化我的代码,以便可以在可承受的时间内在数据湖中获得所需的输出?
df = df.cache()
的调用,甚至在这些计算之间在高速缓存存储中写入数据帧,然后从中读取数据帧。