请原谅Pyspark NOOB问题。
我在PySpark中生成Spark数据帧的最后阶段如下:
indexer = StringIndexer(inputCol="kpID", outputCol="KPindex")
inputs = [indexer.getOutputCol()]
encoder = OneHotEncoderEstimator(inputCols=inputs, outputCols=["KPvec"])
pipeline = Pipeline(stages=[indexer, encoder])
df_bayes = pipeline.fit(df_bayes).transform(df_bayes)
def sparse_to_array(v):
v = DenseVector(v)
new_array = list([float(x) for x in v])
return new_array
sparse_to_array_udf = F.udf(sparse_to_array, ArrayType(FloatType()))
df_bayes = df_bayes.select('id',sparse_to_array_udf(col('KPvec')).alias('KPvec'))
df_bayes = df_bayes.repartition(15000,col('id'))
df_bayes = df_bayes.select('id','KPvec').groupby('id').agg(F.array(*[F.sum(F.col('KPvec')[i]) for i in range(len(kids))]).alias("KPvec")).cache()
我正在尝试汇总一个代表一个热编码分类变量的稀疏向量。
在我的EMR群集上,这需要188秒才能完成。结果数据帧有约5000万行。然后,我尝试将此数据帧写入实木复合地板。
我尝试过:
df_bayes.write.format("parquet") \
.partitionBy("id") \
.bucketBy(500,"KPvec") \
.option("path", "s3://..."+"output.parquet") \
.saveAsTable("output")
和:
df_bayes.write.repartition(1500,col('id')).parquet("s3://..."+"output.parquet")
并且无需重新分区。
[在每种情况下,该工作都花费很长时间,并最终因ExecutorLostFailure失败(这是由于EMR运行了许多竞价型实例)。
尽管之前进行过缓存,但我怀疑其中许多步骤实际上与实木复合地板书写无关,而与我要求的计算步骤有关。
我怀疑是这种情况,因为如果我尝试计算数据框的尺寸,则会看到DAG可视化效果是:
重复的步骤以及作业失败前的约6GB随机写入,向我表明,我执行计算的方式效率很低。
而且,当我运行explain
时,我得到以下信息:
== Physical Plan ==
InMemoryTableScan [id#1, KPvec#52167]
+- InMemoryRelation [id#1, KPvec#52167], StorageLevel(disk, memory, deserialized, 1 replicas)
+- HashAggregate(keys=[id#1], functions=[sum(cast(KPvec#27[0] as double)), sum(cast(KPvec#27[1] as double)), sum(cast(KPvec#27[2] as double)), sum(cast(KPvec#27[3] as double)), sum(cast(KPvec#27[4] as double)), sum(cast(KPvec#27[5] as double)), sum(cast(KPvec#27[6] as double)), sum(cast(KPvec#27[7] as double)), sum(cast(KPvec#27[8] as double)), sum(cast(KPvec#27[9] as double)), sum(cast(KPvec#27[10] as double)), sum(cast(KPvec#27[11] as double)), sum(cast(KPvec#27[12] as double)), sum(cast(KPvec#27[13] as double)), sum(cast(KPvec#27[14] as double)), sum(cast(KPvec#27[15] as double)), sum(cast(KPvec#27[16] as double)), sum(cast(KPvec#27[17] as double)), sum(cast(KPvec#27[18] as double)), sum(cast(KPvec#27[19] as double)), sum(cast(KPvec#27[20] as double)), sum(cast(KPvec#27[21] as double)), sum(cast(KPvec#27[22] as double)), sum(cast(KPvec#27[23] as double)), ... 52109 more fields])
+- HashAggregate(keys=[id#1], functions=[partial_sum(cast(KPvec#27[0] as double)), partial_sum(cast(KPvec#27[1] as double)), partial_sum(cast(KPvec#27[2] as double)), partial_sum(cast(KPvec#27[3] as double)), partial_sum(cast(KPvec#27[4] as double)), partial_sum(cast(KPvec#27[5] as double)), partial_sum(cast(KPvec#27[6] as double)), partial_sum(cast(KPvec#27[7] as double)), partial_sum(cast(KPvec#27[8] as double)), partial_sum(cast(KPvec#27[9] as double)), partial_sum(cast(KPvec#27[10] as double)), partial_sum(cast(KPvec#27[11] as double)), partial_sum(cast(KPvec#27[12] as double)), partial_sum(cast(KPvec#27[13] as double)), partial_sum(cast(KPvec#27[14] as double)), partial_sum(cast(KPvec#27[15] as double)), partial_sum(cast(KPvec#27[16] as double)), partial_sum(cast(KPvec#27[17] as double)), partial_sum(cast(KPvec#27[18] as double)), partial_sum(cast(KPvec#27[19] as double)), partial_sum(cast(KPvec#27[20] as double)), partial_sum(cast(KPvec#27[21] as double)), partial_sum(cast(KPvec#27[22] as double)), partial_sum(cast(KPvec#27[23] as double)), ... 52109 more fields])
+- Exchange hashpartitioning(id#1, 15000)
+- *(2) Project [id#1, pythonUDF0#52170 AS KPvec#27]
+- BatchEvalPython [sparse_to_array(KPvec#3)], [KPvec#3, id#1, pythonUDF0#52170]
+- *(1) Project [KPvec#3, id#1]
+- *(1) FileScan parquet [id#1,KPvec#3] Batched: false, Format: Parquet, Location: InMemoryFileIndex[s3://...bayesnetw..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,KPvec:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>
有人能指出我在这里做错了什么吗?
先谢谢您。
因此,要回答我自己的问题以防他人受骗,解决方案是将要热编码的功能设置为set_set,然后使用CountVectorizor代替Spark ML的OneHotEncoder。
df.select('id','feature').groupby('id').agg(F.collect_set('feature').alias('feature'))
countModel = CountVectorizer().setInputCol("feature").setOutputCol("feature_vec").fit(df)
df = countModel.transform(df).select('id','KPvec')
然后您可以将其保存到镶木地板中。对我来说,这是非常快的。