pyspark udf正在分析打印行

问题描述 投票:0回答:1

我在pyspark udf函数中有问题,我想打印产生问题的行号。

我尝试使用相当于Python中的静态变量来计算行数,但它不起作用:

import pyspark.sql.functions as F
def myF(input):
    myF.lineNumber += 1
    if (somethingBad):
        print(myF.lineNumber)
    return res

myF.lineNumber = 0

myF_udf =  F.udf(myF, StringType())

我如何计算pyspark udf中的行?

python python-3.x pyspark user-defined-functions static-variables
1个回答
1
投票

udfs在worker中执行,因此它们内部的print语句不会出现在输出中(来自驱动程序)。处理UDF问题的最佳方法是将UDF的返回类型更改为结构或列表,并将错误信息与返回的输出一起传递。在下面的代码中,我只是将错误信息添加到您最初返回的字符串res中。

import pyspark.sql.functions as F
def myF(input):
  myF.lineNumber += 1
  if (somethingBad):
    res += 'Error in line {}".format(myF.lineNumber)
  return res

myF.lineNumber = 0

myF_udf =  F.udf(myF, StringType())
© www.soinside.com 2019 - 2024. All rights reserved.