[调用udf函数时出现pyspark泡菜错误

问题描述 投票:0回答:1

执行此操作时出现泡菜错误。我是pyspark的新手,所以如果我错过任何事情,我都不会感到震惊。有人可以帮助我理解我的错误吗?该代码应该在文本中以_连接所有关键字中的单词组合。

from pyspark.sql.types import StringType, Row
from pyspark.sql import functions as F, SparkSession
spark = SparkSession.builder.appName('kwTool').getOrCreate()
sentence_tokenized_docs = spark.createDataFrame(
    [Row(sentence='machine learning is a core domain of the artificial intelligence.'),
     Row(sentence='it contains multiple algorithms.'),
     Row(sentence='neural networks are efficient when there is a lot of training data')])
combinations = spark.createDataFrame(
    [Row(keyword='machine_learning', key='machine learning'), Row(keyword='artificial_intelligence',
                                                                 key='artificial intelligence'), Row(
        keyword='neural_networks', key='neural networks')])
def combine_keywords(sentence: str):
    useful_keywords = combinations.filter('locate(key, "{}") != 0'.format(sentence))
    useful_keywords = list(useful_keywords.select('key').collect()[0])
    for key in sorted(useful_keywords, key=len, reverse=True):
        sentence = sentence.replace(key, key.replace(' ', '_'))
    return sentence
combined_udf = F.udf(combine_keywords, StringType())
updated_sentences = sentence_tokenized_docs.withColumn('combined_keywords',
                                                       combined_udf(sentence_tokenized_docs['sentence'])).drop(
    'sentence')
updated_sentences.show()

编辑:经过更多的调试后,我得出的结论显然是我无法像这样从函数的外部访问数据帧。有人可以提出修复建议吗?

python-3.x pyspark pyspark-sql
1个回答
0
投票

是否有原因,您想从中访问数据框关键字,而不是在外部定义有用的关键字并定义一次?

useful_keywords = combinations.filter('locate(key, "{}") != 0'.format(sentence))
useful_keywords = list(useful_keywords.select('key').collect()[0])

def combine_keywords(sentence: str, useful_keywords=useful_keywords): 
    for key in sorted(useful_keywords, key=len, reverse=True):
        sentence = sentence.replace(key, key.replace(' ', '_'))
    return sentence
combined_udf = F.udf(combine_keywords, StringType())
updated_sentences = sentence_tokenized_docs.withColumn('combined_keywords',
                                                      combined_udf(sentence_tokenized_docs['sentence'])).drop(
    'sentence')
© www.soinside.com 2019 - 2024. All rights reserved.