是否可以广播字典并将其用作 pyspark 中的查找?

问题描述 投票:0回答:1

我正在 azure databricks 中运行以下代码。

  1. 使用 erp_bu 创建查找字典,它是具有两列“erp_code”和“bu”的行对象列表

  2. 广播查找字典

  3. 尝试在 udf 中查找值

如果这不正确,还有其他方法吗? 我收到一条错误消息,指出广播变量未加载。我尝试将广播变量传递给 udf,但它只接受字符串或列。

erp_bu_dic = {}
for row in erp_bu:
    if row['erp_code']:
        erp_bu_dic[row['erp_code']] = row['bu']
broadcast_erp_bu_dic = sc.broadcast(erp_bu_dic)
j06 = j06.withColumn('business_unit', udf(lambda x : broadcast_erp_bu_dic.value[x])("entity_source_id"))
display(j06)```

apache-spark pyspark databricks azure-databricks
1个回答
0
投票

原则上你的方法应该有效,答案是肯定的,可以广播字典并将其用作查找

如果没有更多细节,很难确定您的问题是什么,但我尝试模拟一个尽可能接近您的描述的场景,并且下面的代码运行良好。

from pyspark.sql.functions import udf
from pyspark.sql import Row

#mock the j06 dataframe        
j06_data = [("1YZ", "meh"), ("X2Z", "blah"), ("XY3", "ha")] 
df_j06 = spark.createDataFrame(j06_data, ["entity_source_id", "other_col"]) 

#mock the business unit dictionary
bu_rows = [Row("1YZ", "BU One"), Row("X2Z", "BU Two"), Row("XY3", "BU Three")]
erp_bu_dict = {}
for row in bu_rows:
    erp_bu_dict[row[0]] = row[1]

#broadcast the dict
broadcast_bu_dict = spark.sparkContext.broadcast(erp_bu_dict)

#define mapping udf
udf_map_bu = udf(lambda x : broadcast_bu_dict.value[x])
#map business unit
df_j06 = df_j06.withColumn('business_unit', udf_map_bu('entity_source_id'))

display(df_j06)

我推测问题出现在您的代码中较早的位置,并且与您的“erp_bu”行列表相关。

我还会考虑将“erp_bu”转换为数据帧并运行连接而不是当前的方法。

© www.soinside.com 2019 - 2024. All rights reserved.