我在pyspark udf函数中有问题,我想打印产生问题的行号。
我尝试使用相当于Python中的静态变量来计算行数,但它不起作用:
import pyspark.sql.functions as F
def myF(input):
myF.lineNumber += 1
if (somethingBad):
print(myF.lineNumber)
return res
myF.lineNumber = 0
myF_udf = F.udf(myF, StringType())
我如何计算pyspark udf中的行?
udfs在worker中执行,因此它们内部的print语句不会出现在输出中(来自驱动程序)。处理UDF问题的最佳方法是将UDF的返回类型更改为结构或列表,并将错误信息与返回的输出一起传递。在下面的代码中,我只是将错误信息添加到您最初返回的字符串res中。
import pyspark.sql.functions as F
def myF(input):
myF.lineNumber += 1
if (somethingBad):
res += 'Error in line {}".format(myF.lineNumber)
return res
myF.lineNumber = 0
myF_udf = F.udf(myF, StringType())