将PySpark数据框导出到Azure Data Lake永远需要

问题描述 投票:0回答:1

当输入数据的大小(大约6 GB)很小时,以下代码在Mac OS(Python 3.7)上的PySpark 2.4的独立版本上运行得很好。但是,当我在HDInsight群集(HDI 4.0,即Python 3.5,PySpark 2.4、4个工作程序节点,分别具有64个内核和432 GB RAM,2个标头节点,每个都有4个内核和28 GB RAM,2个头节点)上运行代码时,生成具有较大输入数据(169 GB)的数据湖,最后一步,就是将数据写入数据湖,花了我很多时间(执行24小时后我杀死了它)。鉴于HDInsight在云计算社区中并不流行,我只能引用抱怨将数据帧写入S3时速度低的帖子。一些人建议对数据集进行重新分区,我做了,但没有帮助。

from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StringType, IntegerType, BooleanType
from pyspark.sql.functions import udf, regexp_extract, collect_set, array_remove, col, size, asc, desc
from pyspark.ml.fpm import FPGrowth
import os
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.5"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3.5"

def work(order_path, beer_path, corpus_path, output_path, FREQ_THRESHOLD=1000, LIFT_THRESHOLD=1):
    print("Creating Spark Environment...")
    spark = SparkSession.builder.appName("Menu").getOrCreate()
    print("Spark Environment Created!")
    print("Working on Checkpoint1...")
    orders = spark.read.csv(order_path)
    orders.createOrReplaceTempView("orders")
    orders = spark.sql(
        "SELECT _c14 AS order_id, _c31 AS in_menu_id, _c32 AS in_menu_name FROM orders"
    )
    orders.createOrReplaceTempView("orders")
    beer = spark.read.csv(
        beer_path,
        header=True
    )
    beer.createOrReplaceTempView("beer")
    beer = spark.sql(
        """
        SELECT 
            order_id AS beer_order_id,
            in_menu_id AS beer_in_menu_id,
            '-999' AS beer_in_menu_name
        FROM beer
        """
    )
    beer.createOrReplaceTempView("beer")
    orders = spark.sql(
        """
        WITH orders_beer AS (
            SELECT *
            FROM orders
            LEFT JOIN beer
            ON orders.in_menu_id = beer.beer_in_menu_id
        )
        SELECT
            order_id,
            in_menu_id,
            CASE
                WHEN beer_in_menu_name IS NOT NULL THEN beer_in_menu_name
                WHEN beer_in_menu_name IS NULL THEN in_menu_name
            END AS menu_name
        FROM orders_beer
        """
    )
    print("Checkpoint1 Completed!")
    print("Working on Checkpoint2...")
    corpus = spark.read.csv(
        corpus_path,
        header=True
    )
    keywords = corpus.select("Food_Name").rdd.flatMap(lambda x: x).collect()
    orders = orders.withColumn(
        "keyword", 
        regexp_extract(
            "menu_name", 
            "(?=^|\s)(" + "|".join(keywords) + ")(?=\s|$)", 
            0
        )
    )
    orders.createOrReplaceTempView("orders")
    orders = spark.sql("""
        SELECT order_id, in_menu_id, keyword
        FROM orders
        WHERE keyword != ''
    """)
    orders.createOrReplaceTempView("orders")
    orders = orders.groupBy("order_id").agg(
        collect_set("keyword").alias("items")
    )
    print("Checkpoint2 Completed!")
    print("Working on Checkpoint3...")
    fpGrowth = FPGrowth(
        itemsCol="items", 
        minSupport=0, 
        minConfidence=0
    )
    model = fpGrowth.fit(orders)
    print("Checkpoint3 Completed!")
    print("Working on Checkpoint4...")
    frequency = model.freqItemsets
    frequency = frequency.filter(col("freq") > FREQ_THRESHOLD)
    frequency = frequency.withColumn(
        "items", 
        array_remove("items", "-999")
    )
    frequency = frequency.filter(size(col("items")) > 0)
    frequency = frequency.orderBy(asc("items"), desc("freq"))
    frequency = frequency.dropDuplicates(["items"])
    frequency = frequency.withColumn(
        "antecedent", 
        udf(
            lambda x: "|".join(sorted(x)), StringType()
        )(frequency.items)
    )
    frequency.createOrReplaceTempView("frequency")
    lift = model.associationRules
    lift = lift.drop("confidence")
    lift = lift.filter(col("lift") > LIFT_THRESHOLD)
    lift = lift.filter(
        udf(
            lambda x: x == ["-999"], BooleanType()
        )(lift.consequent)
    )
    lift = lift.drop("consequent")
    lift = lift.withColumn(
        "antecedent", 
        udf(
            lambda x: "|".join(sorted(x)), StringType()
        )(lift.antecedent)
    )
    lift.createOrReplaceTempView("lift")
    result = spark.sql(
        """
        SELECT lift.antecedent, freq AS frequency, lift
        FROM lift
        INNER JOIN frequency
        ON lift.antecedent = frequency.antecedent
        """
    )
    print("Checkpoint4 Completed!")
    print("Writing Result to Data Lake...")
    result.repartition(1024).write.mode("overwrite").parquet(output_path)
    print("All Done!")

def main():
    work(
        order_path=169.1 GB of txt,
        beer_path=4.9 GB of csv,
        corpus_path=210 KB of csv,
        output_path="final_result.parquet"
    )

if __name__ == "__main__":
    main()

我首先认为这是由文件格式parquet引起的。但是,当我尝试使用csv时,遇到了同样的问题。我试过result.count()来查看表result有多少行。花费了很长时间才获得行号,就像将数据写入数据湖一样。如果大型数据集与小型数据集连接,建议使用broadcast hash join而不是默认的[[sort-merge join。我认为值得尝试,因为在初步研究中较小的样本告诉我frequency的行数大约是lift的行数的0.09%(如果您难以跟踪frequencylift,请参见下面的查询)。

SELECT lift.antecedent, freq AS frequency, lift FROM lift INNER JOIN frequency ON lift.antecedent = frequency.antecedent
考虑到这一点,我修改了代码:

from pyspark.sql import SparkSession from pyspark.sql.types import ArrayType, StringType, IntegerType, BooleanType from pyspark.sql.functions import udf, regexp_extract, collect_set, array_remove, col, size, asc, desc from pyspark.ml.fpm import FPGrowth import os os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.5" os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3.5" def work(order_path, beer_path, corpus_path, output_path, FREQ_THRESHOLD=1000, LIFT_THRESHOLD=1): print("Creating Spark Environment...") spark = SparkSession.builder.appName("Menu").getOrCreate() print("Spark Environment Created!") print("Working on Checkpoint1...") orders = spark.read.csv(order_path) orders.createOrReplaceTempView("orders") orders = spark.sql( "SELECT _c14 AS order_id, _c31 AS in_menu_id, _c32 AS in_menu_name FROM orders" ) orders.createOrReplaceTempView("orders") beer = spark.read.csv( beer_path, header=True ) beer.createOrReplaceTempView("beer") beer = spark.sql( """ SELECT order_id AS beer_order_id, in_menu_id AS beer_in_menu_id, '-999' AS beer_in_menu_name FROM beer """ ) beer.createOrReplaceTempView("beer") orders = spark.sql( """ WITH orders_beer AS ( SELECT * FROM orders LEFT JOIN beer ON orders.in_menu_id = beer.beer_in_menu_id ) SELECT order_id, in_menu_id, CASE WHEN beer_in_menu_name IS NOT NULL THEN beer_in_menu_name WHEN beer_in_menu_name IS NULL THEN in_menu_name END AS menu_name FROM orders_beer """ ) print("Checkpoint1 Completed!") print("Working on Checkpoint2...") corpus = spark.read.csv( corpus_path, header=True ) keywords = corpus.select("Food_Name").rdd.flatMap(lambda x: x).collect() orders = orders.withColumn( "keyword", regexp_extract( "menu_name", "(?=^|\s)(" + "|".join(keywords) + ")(?=\s|$)", 0 ) ) orders.createOrReplaceTempView("orders") orders = spark.sql(""" SELECT order_id, in_menu_id, keyword FROM orders WHERE keyword != '' """) orders.createOrReplaceTempView("orders") orders = orders.groupBy("order_id").agg( collect_set("keyword").alias("items") ) print("Checkpoint2 Completed!") print("Working on Checkpoint3...") fpGrowth = FPGrowth( itemsCol="items", minSupport=0, minConfidence=0 ) model = fpGrowth.fit(orders) print("Checkpoint3 Completed!") print("Working on Checkpoint4...") frequency = model.freqItemsets frequency = frequency.filter(col("freq") > FREQ_THRESHOLD) frequency = frequency.withColumn( "antecedent", array_remove("items", "-999") ) frequency = frequency.drop("items") frequency = frequency.filter(size(col("antecedent")) > 0) frequency = frequency.orderBy(asc("antecedent"), desc("freq")) frequency = frequency.dropDuplicates(["antecedent"]) frequency = frequency.withColumn( "antecedent", udf( lambda x: "|".join(sorted(x)), StringType() )(frequency.antecedent) ) lift = model.associationRules lift = lift.drop("confidence") lift = lift.filter(col("lift") > LIFT_THRESHOLD) lift = lift.filter( udf( lambda x: x == ["-999"], BooleanType() )(lift.consequent) ) lift = lift.drop("consequent") lift = lift.withColumn( "antecedent", udf( lambda x: "|".join(sorted(x)), StringType() )(lift.antecedent) ) result = lift.join( frequency.hint("broadcast"), ["antecedent"], "inner" ) print("Checkpoint4 Completed!") print("Writing Result to Data Lake...") result.repartition(1024).write.mode("overwrite").parquet(output_path) print("All Done!") def main(): work( order_path=169.1 GB of txt, beer_path=4.9 GB of csv, corpus_path=210 KB of csv, output_path="final_result.parquet" ) if __name__ == "__main__": main()

该代码在我的Mac OS上使用相同的示例数据运行得很好,并且所期望的时间更少(34秒对26秒)。然后,我决定将代码与完整的数据集一起运行到HDInsight。在将数据写入数据湖的最后一步中,任务失败,并被告知

由于SparkContext已关闭,作业被取消。我对大数据比较陌生,不了解这种方法。互联网上的帖子说,背后可能有很多原因。无论我应该使用哪种方法,如何优化我的代码,以便可以在可承受的时间内在数据湖中获得所需的输出?

pyspark bigdata hdinsight
1个回答
0
投票
我会尝试几件事,根据所需能量的大小排序:

    检查ADL存储是否与HDInsight群集位于同一区域。
  • 在进行大量计算后添加df = df.cache()的调用,甚至在这些计算之间在高速缓存存储中写入数据帧,然后从中读取数据帧。
  • 将UDF替换为“本机” Spark代码,因为UDF是performance bad practices of Spark之一。
© www.soinside.com 2019 - 2024. All rights reserved.