为什么我的简单PySpark代码需要这么长时间才能运行?

问题描述 投票:0回答:1

我的输入数据是3000万行* 113列各种类型的数据。我只是想使用PySpark来增加数据中一列的值,但是它要花很多时间才能运行(有时会因OOM错误而崩溃)。

Spark无法处理此数据规模吗?

from pyspark.sql import functions as F


# output = "/output/local"
# my_input = "/dataset/a"
def my_compute_function(my_input, ctx):
    schema_raw = my_input.dtypes

    def type_for_string(string):
        _simple_type_map = {
            "string": StringType,
            "bool": BooleanType,
            "double": DoubleType,
            "float": FloatType,
            "tinyint": ByteType,
            "int": IntegerType,
            "bigint": LongType,
            "smallint": ShortType,
            "date": DateType
        }

        return_type = _simple_type_map.get(string, None)
        if return_type is None:
            if re.search("^decimal.*", string):
                return DecimalType
            else:
                return NullType
        return return_type

    schema_formatted = list(map(lambda entry: StructField(entry[0], type_for_string(entry[1])(), True), schema_raw))

    local_rows = my_input.collect()

    returned_rows = []
    for row in local_rows:
        d = row.asDict()
        d["history_id"] = d["history_id"] + 1
        returned_rows += [d]

    df = ctx.spark_session.createDataFrame(returned_rows, StructType(schema_formatted))
    return df

无关:如果this合并,您可能会看到此字符串显示SparkSQL功能

pyspark pyspark-sql pyspark-dataframes
1个回答
0
投票

使用本地PySpark

让我们考虑此代码的本地PySpark等效项:

from pyspark.sql import functions as F


# output = "/output/spark"
# input = "/dataset/a"
def my_compute_function(my_input):
    return my_input.withColumn("history_id", F.col("history_id") + F.lit(1).cast("integer"))

这有什么不同之处是没有将数据具体化到您的Spark驱动程序中,它强制将每一行数据从执行程序重新排序回驱动程序,以进行逐行处理。

此代码在我的群集中的6m28s中成功完成。

驱动程序与执行程序

Spark中的驱动程序具有有限的并行化功能,因为它们受到分配给它们的内核数量的限制。尽管您可以导入并使用PySpark中的multiprocessing模块来实现more并行性,但是您并没有使用Spark的全部功能,后者无法让您更有效地并行化计算[[much。

DataFrames的工作方式以及为何如此之快的原因是,您的PySpark方法调用仅描述了[[how,您希望转换数据,而无需在效率低下的Python运行时中为您完成这项工作。

相反,通过堆叠filter()groupBy()或其他等效的SQL方法,您只需构建输出数据集的描述,然后请Spark为您构建。

强制执行collect()方法可将这种并行性从执行者转移到单个进程,该进程甚至可能没有足够的资源来处理规模。

在样本的数据子集上用collect()构建上面的代码,例如,在我的集群中的2m38s中执行1000行,而我上面编写的本机PySpark代码在2m1s中执行。

可伸缩性

[如果您使用上面包含的collect()运行代码,则可能会OOM,而完整数据规模上的PySpark代码将在6毫秒内执行。这表示数据规模增加了30,000倍,而时间仅增加了3倍。

由于必须执行逻辑转换,因此在PySpark中编写代码可能会更加困难,但是它对可伸缩性产生了巨大影响。

您可能还会发现我对here的回答也很有用。

© www.soinside.com 2019 - 2024. All rights reserved.