我想在这里做的是使用此处提供的代码计算N-gramStack Overflow Answer for N-gram
下面的数据是测试数据,实际计算将基于大型分布式数据
+--------------+
| author|
+--------------+
|Test Data Five|
|Test Data Five|
|Data Test Five|
|Test data Five|
|Test Data Five|
| Jack|
+--------------+
from pyspark.ml.feature import NGram
from pyspark.ml import Pipeline
from pyspark.sql import functions as F
def build_ngrams(name,n=3):
ngrams = [
NGram(n=i, inputCol=name, outputCol="{0}_grams".format(i))
for i in range(1, n + 1)
]
return Pipeline(stages=ngrams)
temp_kdf = author_df.withColumn("author", F.split("author", "\s+"))
temp_kdf = temp_kdf.groupby().agg(F.collect_list('author').alias('author'))
data = temp_kdf.select(F.flatten(temp_kdf.author).alias('author'))
temp_kdf = build_ngrams('author).fit(data).transform(data)
我得到的结果如下
+--------------------+
| 2_grams|
+--------------------+
|[Test Data, Data Five, Five Test, Test Data, Data Five, Five Data, Data Test, Test Five, Five Test, Test data, data Five, Five Test, Test Data, Data Five, Five Jack]|
+--------------------+
我想要的结果是“ n_gram”中的前“ N”行,其频率计数是这样的>>
+---------+--------+
| 1_grams|1_counts|
+---------+--------+
|Test Data| 3|
|Data Five| 3|
|Five Test| 3|
|Five Data| 1|
+---------+--------+
我想在这里使用提供的代码来计算N-gram。N-gram的堆栈溢出答案下面的数据是测试数据的实际计算将基于大型分布数据+ ---------。 。
temp_data
.select(col)
.rdd
.flatMap(lambda doc: [(x, 1) for x in doc[0]])
.reduceByKey(Lambda x, y: x + y)