将pyspark流中的Dstream中的列表/数组展平

问题描述 投票:-1回答:1

我正在通过火花流处理一个Kafka主题,我需要计算数组中所有出现的值。它与规范的单词计数示例相似,不同之处在于我的输入数据是字符串列表。全面披露:我对火花一无所知。

["#epstein", "#didnt", "#kill", "#himself"]
["#foo", "#didnt", "#bar"]

需要变得像

#epstein  1
#foo 1
#didnt 2
#kill 1
#himself 1
#bar 1

我可以走得很远,从kafka消息的其余部分中提取主题标签数组,并且可以将其打印到控制台,但是我不知道如何拆分/计数它。

zookeeper = '10.0.8.111:2181'
kafka_topic = 'twitter_short_json'

sc = SparkContext(appName="CountHashtags")
sc.setLogLevel("ERROR")

# sets the stream to run in 5 second increments
ssc = StreamingContext(sc, 5)

kafkaStream = KafkaUtils.createStream(ssc, zookeeper, 'streaming-group', {kafka_topic: 1})

# parse the Kafka stream as json, returns a DStream object
hashtagsDStream = kafkaStream.map(lambda x: x[1]) \
                         .map(lambda j: json.loads(j)) \
                         .map(lambda p: ((p['hashtags']),))
hashtagsDStream.pprint()

如果有RDD,则可以使用如下所示的explode方法:

exploded = hashtagsDStream.withColumn("hashtags", explode(hashtagsDStream.hashtags))

exploded.registerTempTable('exploded_table')
sqlDF = sqlContext.sql('select count(*), hashtags from exploded_table group by hashtags order by 1 desc').show()

但是DStream没有withColumn方法,所以我被困在如何对数组中的实际hashtag值进行计数上。

python arrays apache-spark pyspark streaming
1个回答
0
投票

DStreamRDD的流。您可以调用hashtagsDStream.foreachRDD(rdd ... ),然后在其中写下您将收到的每个rdd的内容。

© www.soinside.com 2019 - 2024. All rights reserved.