将广播变量传递到UDF,Pyspark时出错

问题描述 投票:0回答:1

我有一个函数,它试图将广播变量传递给UDF。

该功能如下:

def generate_lookup_code(self, lookup_map):

    lookup_map_broadcast = spark_session.sparkContext.broadcast(lookup_map)
    print("lookup_map has been broadcasted")

    #### UDF function only return a constant string###
    def _generate_code(bc_reasoncode_lookup_map):

        reasoncode_lookup_map = bc_reasoncode_lookup_map.value
        return "hello"


    udfGenerateCode = F.udf(_generate_code, StringType())

    input_df = input_df.withColumn('code', udfGenerateCode(lookup_map_broadcast))

    input_df.show()

我的意图是只尝试将广播变量传递给UDF,但是,我得到了错误:

'Broadcast' object has no attribute '_get_object_id'

我不知道哪里出错了?

apache-spark pyspark broadcast
1个回答
0
投票

您不需要将广播变量作为UDF参数传递,只需从函数中引用它:

lookup_map_broadcast = spark_session.sparkContext.broadcast(lookup_map)

def _generate_code():
    reasoncode_lookup_map = lookup_map_broadcast.value
    return "hello"

udfGenerateCode = F.udf(_generate_code, StringType())
input_df = input_df.withColumn('code', udfGenerateCode())

为每一行调用UDF,它可以接受列或文字。

© www.soinside.com 2019 - 2024. All rights reserved.