def data_preparation(df):
unlist = udf(lambda x: round(float(list(x)[0]),3), FloatType())
# Iterating over columns to be scaled
for i in ["event"]:
# VectorAssembler Transformation - Converting column to vector type
assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")
# MinMaxScaler Transformation
scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")
# Pipeline of VectorAssembler and MinMaxScaler
pipeline = Pipeline(stages=[assembler, scaler])
# Fitting pipeline on dataframe
df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled",unlist(i+"_Scaled")).drop(i+"_Vect")
return df
在 unlist udf 上面的代码片段中,我试图从列表中取出第一个元素并将其四舍五入到小数点后 3 位。但是当我使用该函数时,它给我这样的错误:
PythonException:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
process()
File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pysparkx/worker.py", line 596, in process
serializer.dump_stream(out_iter, outfile)
File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 211, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream
for obj in iterator:
File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 200, in _batched
for item in iterator:
File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
return lambda *a: f(*a)
File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
return f(*args, **kwargs)
File "<ipython-input-85-a4273b6bc9ab>", line 17, in <lambda>
File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1234, in round
return Column(sc._jvm.functions.round(_to_java_column(col), scale))
AttributeError: 'NoneType' object has no attribute '_jvm'
我试过单独进行舍入操作,但它在程序的后期阶段给我错误。我只是在寻找这个问题的原因
问题在行-
from pyspark.sql.functions import *
而是尝试-
import pyspark.sql.functions as f