Pyspark Py4j IllegalArgumentException,带有spark.createDataFrame和pyspark.ml.clustering

问题描述 投票:0回答:1

首先让我透露问题的全部背景,我将使用简化的MWE,在底部重新创建相同的问题。随时跳过我对设置的讨论,直接进入最后一部分。

原始问题中的演员:

  1. 从Amazon S3读取的Spark数据帧data,其列scaled_features最终是VectorAssembler操作后跟MinMaxScaler的结果。
  2. 像这样的PCA之后,由上述df列产生的spark dataframe列pca_features
mat = RowMatrix(data.select('scaled_features').rdd.map(list))
pc = mat.computePrincipalComponents(2)
projected = mat.multiply(pc).rows.map(lambda x: (x, )).toDF().withColumnRenamed('_1', 'pca_features')
  1. BisectingKMeans的两个实例适合上述数据框中的特征的两个实例:
kmeans_scaled = BisectingKMeans(featuresCol='scaled_features').setK(4).setSeed(1)
model1 = kmeans_scaled.fit(data)

kmeans_pca = BisectingKMeans(featuresCol='pca_features').setK(4).setSeed(1)
model2 = kmeans_pca.fit(projected)

问题:

虽然BisectingKMeans从我的第一个df顺利地适合scaled_features,但是在尝试适合投影特征时,由于以下原因而出错了>]

Py4JJavaError: An error occurred while calling o1413.fit.
: java.lang.IllegalArgumentException: requirement failed: Column features must be of type equal to one of the following types:
[struct<type:tinyint,size:int,indices:array<int>,values:array<double>>, array<double>, array<float>]
but was actually of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.

如您所见,Py4J抱怨我正在以某种结构类型传递数据,而该结构类型恰好是允许的类型列表中指定的第一个类型。

其他调试信息:

我的Spark正在运行2.4.0版

检查dtypes会产生:data.dtypes: [('scaled_features', 'vector')]projected.dtypes: [('pca_features', 'vector')]。两个数据框的架构也相同,仅打印一个以供参考:

root
 |-- scaled_features: vector (nullable = true)

重新创建错误(MWE):

事实证明,可以通过从一些Vector创建一个简单的数据帧来重新创建相同的错误(我原来的dfs中的列也是VectorType的:]

from pyspark.sql import Row
from pyspark.mllib.linalg import DenseVector
from pyspark.ml.clustering import BisectingKMeans

test_data = spark.createDataFrame([Row(test_features=DenseVector([43.0, 0.0, 200.0, 1.0, 1.0, 1.0, 0.0, 3.0])),
    Row(test_features=DenseVector([44.0, 0.0, 250.0, 1.0, 1.0, 1.0, 0.0, 1.0])),
    Row(test_features=DenseVector([23.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0])),
    Row(test_features=DenseVector([25.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 2.0])),
    Row(test_features=DenseVector([19.0, 0.0, 200.0, 1.0, 0.0, 1.0, 0.0, 1.0]))])

kmeans_test = BisectingKMeans(featuresCol='test_features').setK(4).setSeed(1)
model3 = kmeans_test.fit(test_data)

最后一行导致我在原始设置中遇到的相同错误。

任何人都可以解释此错误并提出纠正方法吗?

首先让我透露问题的全部背景,我将使用简化的MWE,在底部重新创建相同的问题。随意跳过我对设置的讨论,直接进入...

python apache-spark pyspark apache-spark-mllib py4j
1个回答
0
投票

经过几天的调查,指出了问题的(相当令人尴尬的)原因:

© www.soinside.com 2019 - 2024. All rights reserved.