Round 函数在与 udf 一起使用时在 pyspark 中给出错误

问题描述 投票:0回答:1
def data_preparation(df):

    unlist = udf(lambda x: round(float(list(x)[0]),3), FloatType())
    # Iterating over columns to be scaled
    for i in ["event"]:
        # VectorAssembler Transformation - Converting column to vector type
        assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

        # MinMaxScaler Transformation
        scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")

        # Pipeline of VectorAssembler and MinMaxScaler
        pipeline = Pipeline(stages=[assembler, scaler])

        # Fitting pipeline on dataframe
        df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled",unlist(i+"_Scaled")).drop(i+"_Vect")

    return df

在 unlist udf 上面的代码片段中,我试图从列表中取出第一个元素并将其四舍五入到小数点后 3 位。但是当我使用该函数时,它给我这样的错误:

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pysparkx/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 211, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream
    for obj in iterator:
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 200, in _batched
    for item in iterator:
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-85-a4273b6bc9ab>", line 17, in <lambda>
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1234, in round
    return Column(sc._jvm.functions.round(_to_java_column(col), scale))
AttributeError: 'NoneType' object has no attribute '_jvm'

我试过单独进行舍入操作,但它在程序的后期阶段给我错误。我只是在寻找这个问题的原因

python pyspark rounding
1个回答
0
投票

问题在行-

from pyspark.sql.functions import *

而是尝试-

import pyspark.sql.functions as f
© www.soinside.com 2019 - 2024. All rights reserved.