我正在使用Spark MLLib在AWS EMR上执行K-means集群。数据集大约是10 ^ 6行和9个特征列。我正在使用的实例大小具有8vCPU和32GB内存。
[我希望随着群集上节点数量的增加,Spark可以提高性能(减少执行时间),但是结果却相反。
与使用单个工作程序节点相比,使用更多工作程序节点/实例可以获得WORSE性能(更高的执行时间)。对于5个,10个和15个工作节点的集群,我得到了相同的结果。随着节点数量的增加,性能会下降。我试图改变分区(spark.sql.shuffle.partitions),并使用了执行程序核心,执行程序数量和执行程序内存的各种配置。
我的代码在下面(执行者的数量是10个工作节点的数量:]
spark-shell --executor-cores 3 num-executors 20 --executor-memory 10G
import org.apache.hadoop.conf.Configuration
import org.apache.phoenix.spark._
import org.apache.spark.sql.functions._
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.sql.{SparkSession, SQLContext, DataFrame}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.linalg.{Vector, Vectors, DenseVector, VectorUDT}
import org.apache.spark.ml.linalg._
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.mllib.linalg.{Vector, Vectors, DenseVector, SparseVector}
sc.stop()
val configuration = new Configuration()
val sc = new SparkContext("local", "phoenix-load")
val sqlContext = new SQLContext(sc)
//dataset is loaded in from Phoenix table and set as featureDf6
//dataset is made up of all numerical values (DOUBLE)
val columns = Array("DUR","AVG_AMP","AVG_POW","PAPR","SNR","SNR_DB","BW_3DB","BW_10DB","BWNE")
val assembler = new VectorAssembler().setInputCols(columns).setOutputCol("featuresin")
val df = assembler.transform(featureDf6)
val scaler = new MinMaxScaler().setInputCol("featuresin").setOutputCol("features").setMin(-1).setMax(1)
val scalerModel = scaler.fit(df)
val scaledData = scalerModel.transform(df)
val kmeans = new KMeans().setK(14).setSeed(1L).setMaxIter(1000)
val model = kmeans.fit(scaledData)
我发现问题的原因是Spark从Phoenix / HBase读取数据的方法。当我将数据集直接上传到Spark时,结果符合预期,并且随着节点的增加,执行时间减少了。我将发布另一个问题,以识别在读取Phoenix的过程中的错误。