在Spark中优化稀疏矢量的总和(并保存到木地板中)

问题描述 投票:0回答:1

请原谅Pyspark NOOB问题。

我在PySpark中生成Spark数据帧的最后阶段如下:

indexer = StringIndexer(inputCol="kpID", outputCol="KPindex")
inputs = [indexer.getOutputCol()]
encoder = OneHotEncoderEstimator(inputCols=inputs, outputCols=["KPvec"])
pipeline = Pipeline(stages=[indexer, encoder])
df_bayes = pipeline.fit(df_bayes).transform(df_bayes)

def sparse_to_array(v):
    v = DenseVector(v)
    new_array = list([float(x) for x in v])
    return new_array

sparse_to_array_udf = F.udf(sparse_to_array, ArrayType(FloatType()))
df_bayes = df_bayes.select('id',sparse_to_array_udf(col('KPvec')).alias('KPvec'))
df_bayes = df_bayes.repartition(15000,col('id'))
df_bayes = df_bayes.select('id','KPvec').groupby('id').agg(F.array(*[F.sum(F.col('KPvec')[i]) for i in range(len(kids))]).alias("KPvec")).cache()

我正在尝试汇总一个代表一个热编码分类变量的稀疏向量。

在我的EMR群集上,这需要188秒才能完成。结果数据帧有约5000万行。然后,我尝试将此数据帧写入实木复合地板。

我尝试过:

df_bayes.write.format("parquet") \
.partitionBy("id") \
.bucketBy(500,"KPvec") \
.option("path", "s3://..."+"output.parquet") \
.saveAsTable("output")

和:

df_bayes.write.repartition(1500,col('id')).parquet("s3://..."+"output.parquet")

并且无需重新分区。

[在每种情况下,该工作都花费很长时间,并最终因ExecutorLostFailure失败(这是由于EMR运行了许多竞价型实例)。

Here is the Spark DAG Visualization

尽管之前进行过缓存,但我怀疑其中许多步骤实际上与实木复合地板书写无关,而与我要求的计算步骤有关。

我怀疑是这种情况,因为如果我尝试计算数据框的尺寸,则会看到DAG可视化效果是:

ag

重复的步骤以及作业失败前的约6GB随机写入,向我表明,我执行计算的方式效率很低。

而且,当我运行explain时,我得到以下信息:

== Physical Plan ==
InMemoryTableScan [id#1, KPvec#52167]
   +- InMemoryRelation [id#1, KPvec#52167], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- HashAggregate(keys=[id#1], functions=[sum(cast(KPvec#27[0] as double)), sum(cast(KPvec#27[1] as double)), sum(cast(KPvec#27[2] as double)), sum(cast(KPvec#27[3] as double)), sum(cast(KPvec#27[4] as double)), sum(cast(KPvec#27[5] as double)), sum(cast(KPvec#27[6] as double)), sum(cast(KPvec#27[7] as double)), sum(cast(KPvec#27[8] as double)), sum(cast(KPvec#27[9] as double)), sum(cast(KPvec#27[10] as double)), sum(cast(KPvec#27[11] as double)), sum(cast(KPvec#27[12] as double)), sum(cast(KPvec#27[13] as double)), sum(cast(KPvec#27[14] as double)), sum(cast(KPvec#27[15] as double)), sum(cast(KPvec#27[16] as double)), sum(cast(KPvec#27[17] as double)), sum(cast(KPvec#27[18] as double)), sum(cast(KPvec#27[19] as double)), sum(cast(KPvec#27[20] as double)), sum(cast(KPvec#27[21] as double)), sum(cast(KPvec#27[22] as double)), sum(cast(KPvec#27[23] as double)), ... 52109 more fields])
            +- HashAggregate(keys=[id#1], functions=[partial_sum(cast(KPvec#27[0] as double)), partial_sum(cast(KPvec#27[1] as double)), partial_sum(cast(KPvec#27[2] as double)), partial_sum(cast(KPvec#27[3] as double)), partial_sum(cast(KPvec#27[4] as double)), partial_sum(cast(KPvec#27[5] as double)), partial_sum(cast(KPvec#27[6] as double)), partial_sum(cast(KPvec#27[7] as double)), partial_sum(cast(KPvec#27[8] as double)), partial_sum(cast(KPvec#27[9] as double)), partial_sum(cast(KPvec#27[10] as double)), partial_sum(cast(KPvec#27[11] as double)), partial_sum(cast(KPvec#27[12] as double)), partial_sum(cast(KPvec#27[13] as double)), partial_sum(cast(KPvec#27[14] as double)), partial_sum(cast(KPvec#27[15] as double)), partial_sum(cast(KPvec#27[16] as double)), partial_sum(cast(KPvec#27[17] as double)), partial_sum(cast(KPvec#27[18] as double)), partial_sum(cast(KPvec#27[19] as double)), partial_sum(cast(KPvec#27[20] as double)), partial_sum(cast(KPvec#27[21] as double)), partial_sum(cast(KPvec#27[22] as double)), partial_sum(cast(KPvec#27[23] as double)), ... 52109 more fields])
               +- Exchange hashpartitioning(id#1, 15000)
                  +- *(2) Project [id#1, pythonUDF0#52170 AS KPvec#27]
                     +- BatchEvalPython [sparse_to_array(KPvec#3)], [KPvec#3, id#1, pythonUDF0#52170]
                        +- *(1) Project [KPvec#3, id#1]
                           +- *(1) FileScan parquet [id#1,KPvec#3] Batched: false, Format: Parquet, Location: InMemoryFileIndex[s3://...bayesnetw..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,KPvec:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>

有人能指出我在这里做错了什么吗?

先谢谢您。

python apache-spark amazon-emr parquet pyspark-dataframes
1个回答
0
投票

因此,要回答我自己的问题以防他人受骗,解决方案是将要热编码的功能设置为set_set,然后使用CountVectorizo​​r代替Spark ML的OneHotEncoder。

df.select('id','feature').groupby('id').agg(F.collect_set('feature').alias('feature'))

countModel = CountVectorizer().setInputCol("feature").setOutputCol("feature_vec").fit(df)
df = countModel.transform(df).select('id','KPvec')

然后您可以将其保存到镶木地板中。对我来说,这是非常快的。

© www.soinside.com 2019 - 2024. All rights reserved.