如何在此 Pyspark mapreduce 代码中拆分年份?

问题描述 投票:0回答:2

我需要为每个单词计算每年有多少篇文章包含它。我一直坚持如何用单词来划分年份,因为我不断得到与日期相连的第一个单词,如下所示。

['20191124,woman', 'stabbed', 'adelaide', 'shopping', 'centre', '20191204,economy', 'continue', 'teetering', 'edge', 'recession', '20200401,coronanom
ics', 'learnt', 'coronavirus', 'economy', '20200401,coronavirus', 'home', 'test', 'kits', 'selling', 'chinese', 'community', '20201015,coronavirus', 
'pacific', 'economy', 'foriegn', 'aid', 'china', '20201016,china', 'builds', 'pig', 'apartment', 'blocks', 'guard', 'swine', 'flu', '20211216,economy
', 'starts', 'bounce', 'unemployment', '20211224,online', 'shopping', 'rise', 'due', 'coronavirus', '20211229,china', 'close', 'encounters', 'elon', 
'musks']

输入:

20191124,woman stabbed adelaide shopping centre
20191204,economy continue teetering edge recession
20200401,coronanomics learnt coronavirus economy
20200401,coronavirus home test kits selling chinese community
20201015,coronavirus pacific economy foriegn aid china
20201016,china builds pig apartment blocks guard swine flu
20211216,economy starts bounce unemployment
20211224,online shopping rise due coronavirus
20211229,china close encounters elon musks

输出 例如,

adelaide    2019:1
aid    2020:1
apartment    2020:1
blocks    2020:1
...

这是我现在的以下代码

from pyspark import SparkContext, SparkConf

sc = SparkContext('local','frequencies')

text = sc.textFile('abcnews.txt')

word_year_pairs = text.flatMap(lambda line: [((word, line[:4]), 1) for word in line.split()])

word_counts = word_counter.reduceByKey(lambda a,b: a+b)

result = word_counts.map(lambda pair: (pair[0][0] + '\t' + pair[0][1] + ":" + str(pair[1])))

final = result.sortByKey()

final.saveAsTextFile('rdd.py')
python python-3.x pyspark mapreduce rdd
2个回答
1
投票

见下面的实现-

输入数据

import pandas as pd
from io import StringIO

ser = """20191124,woman stabbed adelaide shopping centre
20191204,economy continue teetering edge recession
20200401,coronanomics learnt coronavirus economy
20200401,coronavirus home test kits selling chinese community
20201015,coronavirus pacific economy foriegn aid china
20201016,china builds pig apartment blocks guard swine flu
20211216,economy starts bounce unemployment
20211224,online shopping rise due coronavirus
20211229,china close encounters elon musks"""

pdf = pd.read_csv(StringIO(ser),names=['date', 'stringColumn'])
df = spark.createDataFrame(pdf)

(df
  .write
  .mode("overwrite")
  .csv("/test/tstfile.txt")
  )

所需输出

lines = sc.textFile("/test/tstfile.txt")

word_year_rdd = lines.flatMap(lambda line: [(word, line[:4]) for word in line.split()[1:]])

# Count the number of articles each word appears in for each year
word_year_count_rdd = word_year_rdd.map(lambda pair: (pair, 1)).reduceByKey(lambda a, b: a + b)

# Format the output as (word, year:count) pairs
output_rdd = word_year_count_rdd.map(lambda pair: (pair[0][0], pair[0][1] + ":" + str(pair[1])))

# Print the output
output_rdd.toDF().orderBy("_1").show(5)

+---------+------+
|       _1|    _2|
+---------+------+
| adelaide|2019:1|
|      aid|2020:1|
|apartment|2020:1|
|   blocks|2020:1|
|   bounce|2021:1|
+---------+------+

1
投票

这是使用 spark 的高级数据框 API 的简单方法

# convert your rdd to DataFrame
df = text.toDF('string')

result = df.selectExpr(
    "substring(value, 0, 4) as year", 
    "explode(split(substring(value, 10), ' ')) as word"
).groupBy('word', 'year').count().orderBy('word')

结果

result.show()

+------------+----+-----+
|        word|year|count|
+------------+----+-----+
|    adelaide|2019|    1|
|         aid|2020|    1|
|   apartment|2020|    1|
|      blocks|2020|    1|
|      bounce|2021|    1|
|      builds|2020|    1|
|      centre|2019|    1|
|       china|2020|    2|
|       china|2021|    1|
|     chinese|2020|    1|
|       close|2021|    1|
|   community|2020|    1|
|    continue|2019|    1|
|coronanomics|2020|    1|
| coronavirus|2021|    1|
| coronavirus|2020|    3|
|         due|2021|    1|
|     economy|2020|    2|
|     economy|2019|    1|
|     economy|2021|    1|
+------------+----+-----+
only showing top 20 rows
© www.soinside.com 2019 - 2024. All rights reserved.