错误:运行 abbreviation_column_method 时。失败,但有异常:列不可迭代

问题描述 投票:0回答:1
abbreviation_mapping = {
        "E": "Europe",
        "A": "US/Canada",
        "S": "South America",
        "O": "Australia",
        "Z": "New Zealand",
        "N": "New Delhi/Kolkata",
        "U": "New Delhi/Kolkata",
        "": "New Delhi/Kolkata"
        # Add more mappings as needed
    }
def abbreviation_column(self,df,abbreviation_mapping,col_name):
        """
        This function will replace abbreviation with long form
        :param df: input dataframe
        :param abbreviation_mapping: Give abbreviation in the form of dictionary
        :param col_name: column name on which abbreviation_column need to apply
        :return: df
        """
        try:
            df = df.withColumn(
                col_name,
                when(cast(col(col_name).isin(abbreviation_mapping.keys()),int)| (col(col_name) == ""),
                     col(col_name).replace("", "New Delhi/Kolkata").replace(*abbreviation_mapping.items()))
                .otherwise(col(col_name))
            )
        except Exception as e:
            raise Exception(f"Error: While running abbreviation_column_method. Failed with exception: {e}")
        return df

我希望“dst”列中的所有缩写都应替换为长 # 形式

python pyspark etl
1个回答
0
投票

您可以使用udf来实现此目的,请参阅下面的代码

abbreviation_mapping = {
        "E": "Europe",
        "A": "US/Canada",
        "S": "South America",
        "O": "Australia",
        "Z": "New Zealand",
        "N": "New Delhi/Kolkata",
        "U": "New Delhi/Kolkata",
        "": "New Delhi/Kolkata"
        # Add more mappings as needed
    }

abbreviationUDF = udf(lambda z: abbreviation_mapping[short_form]) 

def abbreviation_column(df,abbreviation_mapping,col_name):
        """
        This function will replace abbreviation with long form
        :param df: input dataframe
        :param abbreviation_mapping: Give abbreviation in the form of dictionary
        :param col_name: column name on which abbreviation_column need to apply
        :return: df
        """
        try:
            df = df.withColumn(col_name, abbreviationUDF(col(col_name)))
        except Exception as e:
            raise Exception(f"Error: While running abbreviation_column_method. Failed with exception: {e}")
        return df
    
df2 = abbreviation_column(df, abbreviation_mapping, "city")

© www.soinside.com 2019 - 2024. All rights reserved.