我需要为每个单词计算每年有多少篇文章包含它。我一直坚持如何用单词来划分年份,因为我不断得到与日期相连的第一个单词,如下所示。
['20191124,woman', 'stabbed', 'adelaide', 'shopping', 'centre', '20191204,economy', 'continue', 'teetering', 'edge', 'recession', '20200401,coronanom
ics', 'learnt', 'coronavirus', 'economy', '20200401,coronavirus', 'home', 'test', 'kits', 'selling', 'chinese', 'community', '20201015,coronavirus',
'pacific', 'economy', 'foriegn', 'aid', 'china', '20201016,china', 'builds', 'pig', 'apartment', 'blocks', 'guard', 'swine', 'flu', '20211216,economy
', 'starts', 'bounce', 'unemployment', '20211224,online', 'shopping', 'rise', 'due', 'coronavirus', '20211229,china', 'close', 'encounters', 'elon',
'musks']
输入:
20191124,woman stabbed adelaide shopping centre
20191204,economy continue teetering edge recession
20200401,coronanomics learnt coronavirus economy
20200401,coronavirus home test kits selling chinese community
20201015,coronavirus pacific economy foriegn aid china
20201016,china builds pig apartment blocks guard swine flu
20211216,economy starts bounce unemployment
20211224,online shopping rise due coronavirus
20211229,china close encounters elon musks
输出 例如,
adelaide 2019:1
aid 2020:1
apartment 2020:1
blocks 2020:1
...
这是我现在的以下代码
from pyspark import SparkContext, SparkConf
sc = SparkContext('local','frequencies')
text = sc.textFile('abcnews.txt')
word_year_pairs = text.flatMap(lambda line: [((word, line[:4]), 1) for word in line.split()])
word_counts = word_counter.reduceByKey(lambda a,b: a+b)
result = word_counts.map(lambda pair: (pair[0][0] + '\t' + pair[0][1] + ":" + str(pair[1])))
final = result.sortByKey()
final.saveAsTextFile('rdd.py')
见下面的实现-
输入数据
import pandas as pd
from io import StringIO
ser = """20191124,woman stabbed adelaide shopping centre
20191204,economy continue teetering edge recession
20200401,coronanomics learnt coronavirus economy
20200401,coronavirus home test kits selling chinese community
20201015,coronavirus pacific economy foriegn aid china
20201016,china builds pig apartment blocks guard swine flu
20211216,economy starts bounce unemployment
20211224,online shopping rise due coronavirus
20211229,china close encounters elon musks"""
pdf = pd.read_csv(StringIO(ser),names=['date', 'stringColumn'])
df = spark.createDataFrame(pdf)
(df
.write
.mode("overwrite")
.csv("/test/tstfile.txt")
)
所需输出
lines = sc.textFile("/test/tstfile.txt")
word_year_rdd = lines.flatMap(lambda line: [(word, line[:4]) for word in line.split()[1:]])
# Count the number of articles each word appears in for each year
word_year_count_rdd = word_year_rdd.map(lambda pair: (pair, 1)).reduceByKey(lambda a, b: a + b)
# Format the output as (word, year:count) pairs
output_rdd = word_year_count_rdd.map(lambda pair: (pair[0][0], pair[0][1] + ":" + str(pair[1])))
# Print the output
output_rdd.toDF().orderBy("_1").show(5)
+---------+------+
| _1| _2|
+---------+------+
| adelaide|2019:1|
| aid|2020:1|
|apartment|2020:1|
| blocks|2020:1|
| bounce|2021:1|
+---------+------+
这是使用 spark 的高级数据框 API 的简单方法
# convert your rdd to DataFrame
df = text.toDF('string')
result = df.selectExpr(
"substring(value, 0, 4) as year",
"explode(split(substring(value, 10), ' ')) as word"
).groupBy('word', 'year').count().orderBy('word')
结果
result.show()
+------------+----+-----+
| word|year|count|
+------------+----+-----+
| adelaide|2019| 1|
| aid|2020| 1|
| apartment|2020| 1|
| blocks|2020| 1|
| bounce|2021| 1|
| builds|2020| 1|
| centre|2019| 1|
| china|2020| 2|
| china|2021| 1|
| chinese|2020| 1|
| close|2021| 1|
| community|2020| 1|
| continue|2019| 1|
|coronanomics|2020| 1|
| coronavirus|2021| 1|
| coronavirus|2020| 3|
| due|2021| 1|
| economy|2020| 2|
| economy|2019| 1|
| economy|2021| 1|
+------------+----+-----+
only showing top 20 rows