我是火花专家,我正在尝试使用MinMaxScaler。我正在使用Spark 2.1.1并用Jupyter编写所以我的步骤是。1.
from pyspark.sql import SQLContext
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
batch1 = sqlContext.sql("SELECT field1 FROM hive_table limit 10000")
如文档中所述,为了使用MinMaxScaler
,我的字段应为Vectors.dense
。所以这是我下一步要做的。
batch2 = batch1.rdd.map(lambda row: Vectors.dense(row.field1))
此步骤之后,我无法对batch2执行任何操作。例如,如果我这样做
for record in batch2.collect():
print(record)
我收到这种错误
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 30 in stage 25.0 failed 1 times, most recent failure: Lost task 30.0 in stage 25.0 (TID 389, localhost, executor driver): java.lang.ClassCastException
我在做什么错?
可能您的列row.field1
不是数组,[编辑],但是Vectors.dense使用单个值作为输入输入,因此我想您的列field1
包含一些非数字记录。
请参见下面的示例:
df = spark.createDataFrame([(123, 25), (23, 22), (2, 20)], ['c1', 'c2'])
df.show()
df.printSchema()
import pyspark.sql.functions as f
df2 = df.select(f.array('*')).toDF('arr')
df2.show()
from pyspark.ml.linalg import Vectors
rdd = df2.rdd.map(lambda x: Vectors.dense(x.arr))
for record in rdd.collect():
print(record)
+---+---+
| c1| c2|
+---+---+
|123| 25|
| 23| 22|
| 2| 20|
+---+---+
root
|-- c1: long (nullable = true)
|-- c2: long (nullable = true)
+---------+
| arr|
+---------+
|[123, 25]|
| [23, 22]|
| [2, 20]|
+---------+
[123.0,25.0]
[23.0,22.0]
[2.0,20.0]