如何使用pyspark流计算csv文件中的条目数

问题描述 投票:0回答:1

我有一个包含.csv文件编号的监视器目录。我需要计算即将到来的number of entries文件中的每个.csv。我想在pyspark流上下文中执行此操作。这就是我所做的,

my_DStream = ssc.textFileStream(monitor_Dir)
test = my_DStream.flatMap(process_file)  # process_file function simply process my file. e.g line.split(";")
print(len(test.collect()))

这不会给我想要的结果。例如,file1.csv包含10项,file2.csv包含18项等,因此我需要查看输出

10
18
..
..
etc

如果我只有一个静态文件并使用rdd操作,则可以完成相同的任务。

python-3.x pyspark bigdata spark-streaming rdd
1个回答
1
投票

如果有人感兴趣,这就是我所做的。

my_DStream = ssc.textFileStream(monitor_Dir)
DStream1 = my_DStream.flatMap(process_file) 
DStream2 = DStream1.filter(lambda x: x[0])
lines_num = DStream2.count() 
lines_num.pprint()

这给了我想要的输出。

© www.soinside.com 2019 - 2024. All rights reserved.