我的输入数据是3000万行* 113列各种类型的数据。我只是想使用PySpark来增加数据中一列的值,但是它要花很多时间才能运行(有时会因OOM错误而崩溃)。
Spark无法处理此数据规模吗?
from pyspark.sql import functions as F
# output = "/output/local"
# my_input = "/dataset/a"
def my_compute_function(my_input, ctx):
schema_raw = my_input.dtypes
def type_for_string(string):
_simple_type_map = {
"string": StringType,
"bool": BooleanType,
"double": DoubleType,
"float": FloatType,
"tinyint": ByteType,
"int": IntegerType,
"bigint": LongType,
"smallint": ShortType,
"date": DateType
}
return_type = _simple_type_map.get(string, None)
if return_type is None:
if re.search("^decimal.*", string):
return DecimalType
else:
return NullType
return return_type
schema_formatted = list(map(lambda entry: StructField(entry[0], type_for_string(entry[1])(), True), schema_raw))
local_rows = my_input.collect()
returned_rows = []
for row in local_rows:
d = row.asDict()
d["history_id"] = d["history_id"] + 1
returned_rows += [d]
df = ctx.spark_session.createDataFrame(returned_rows, StructType(schema_formatted))
return df
无关:如果this合并,您可能会看到此字符串显示SparkSQL功能
让我们考虑此代码的本地PySpark等效项:
from pyspark.sql import functions as F
# output = "/output/spark"
# input = "/dataset/a"
def my_compute_function(my_input):
return my_input.withColumn("history_id", F.col("history_id") + F.lit(1).cast("integer"))
这有什么不同之处是没有将数据具体化到您的Spark驱动程序中,它强制将每一行数据从执行程序重新排序回驱动程序,以进行逐行处理。
此代码在我的群集中的6m28s
中成功完成。
Spark中的驱动程序具有有限的并行化功能,因为它们受到分配给它们的内核数量的限制。尽管您可以导入并使用PySpark中的multiprocessing
模块来实现more并行性,但是您并没有使用Spark的全部功能,后者无法让您更有效地并行化计算[[much。
相反,通过堆叠filter()
,groupBy()
或其他等效的SQL方法,您只需构建输出数据集的描述,然后请Spark为您构建。
在样本的数据子集上用collect()
构建上面的代码,例如,在我的集群中的2m38s
中执行1000行,而我上面编写的本机PySpark代码在2m1s
中执行。
可伸缩性
[如果您使用上面包含的collect()
运行代码,则可能会OOM,而完整数据规模上的PySpark代码将在6毫秒内执行。这表示数据规模增加了30,000倍,而时间仅增加了3倍。
您可能还会发现我对here的回答也很有用。