我有一个函数,它试图将广播变量传递给UDF。
该功能如下:
def generate_lookup_code(self, lookup_map):
lookup_map_broadcast = spark_session.sparkContext.broadcast(lookup_map)
print("lookup_map has been broadcasted")
#### UDF function only return a constant string###
def _generate_code(bc_reasoncode_lookup_map):
reasoncode_lookup_map = bc_reasoncode_lookup_map.value
return "hello"
udfGenerateCode = F.udf(_generate_code, StringType())
input_df = input_df.withColumn('code', udfGenerateCode(lookup_map_broadcast))
input_df.show()
我的意图是只尝试将广播变量传递给UDF,但是,我得到了错误:
'Broadcast' object has no attribute '_get_object_id'
我不知道哪里出错了?
您不需要将广播变量作为UDF参数传递,只需从函数中引用它:
lookup_map_broadcast = spark_session.sparkContext.broadcast(lookup_map)
def _generate_code():
reasoncode_lookup_map = lookup_map_broadcast.value
return "hello"
udfGenerateCode = F.udf(_generate_code, StringType())
input_df = input_df.withColumn('code', udfGenerateCode())
为每一行调用UDF,它可以接受列或文字。