如何根据数据集中的行长度过滤RDD。

问题描述 投票:0回答:1

我想根据行的长度使用:Pyspark shell过滤掉从数据集创建的RDD

我的数据文件如下所示

> fzDTn342L3Q   djjohnnykey 599 Music   185 1005    3.67    3   1   KDrJSNIGNDQ MacjQFNVLlQ oZ6f2vaH858 fYSjMDNa4S8 JUatPzf_eSc QfBFl7kU35c rG-rQ-YGdSA kOq6sFmoUr0 IRj1IABVBis AVsZ0VH3eN4 r1pS_4qouUc YgaNW1KRgK4 ZlGdVR7mBy4 nKFLE3DX4OQ EtQjN6CQeCc afe-0VY4YiI ekV5NseEdy8 IQs6CrER5fY jTLcoIxMI-E yfvW1ITcMpM
> 
> kOq6sFmoUr0   djjohnnykey 599 Music   113 992 0   0   1   MacjQFNVLlQ fYSjMDNa4S8 4vso1y_-cvk 8BwAX6YBx3E QeUQyf8H7vM jmc21-Nhewg hZUU2-UBaGk SaLaotssH0w PUlcrBaYpwI tjIK2xop4L0 BNlL15OYnFY _pzP7OLInjk 4daGJ6TMcp4 _8jM9R-1yRk KDrJSNIGNDQ oZ6f2vaH858 JUatPzf_eSc QfBFl7kU35c rG-rQ-YGdSA fzDTn342L3Q

这里第4列是类别。数据文件中的某些行不包含此字段,因此长度较短。这促使我根据此标准过滤掉数据集,并进一步在具有类别的数据集上形成RDD。

我试图从数据集创建初始RDD。

>>> data="/Users/sk/Documents/BigData/0222/0.txt"
>>> input = sc.textFile(data)

现在我按选项卡拆分并保存在RDDS行中

>>> lines = input.map(lambda x: (str(x.split('\t'))))

在此之后我想过滤掉长度小于3的线。

>>> data="/Users/sk/Documents/BigData/0222/1.txt"
>>> input = sc.textFile(data)
>>> lines = input.map(lambda x: (str(x.split('\t'))))
>>> lines.count()
3169

>>> newinput=input.filter(lambda x: len(x)>3)
>>> newinput.count()
3169

在此之后它不会改变我的rdd中的任何内容。任何人都可以帮忙。

python apache-spark filter pyspark rdd
1个回答
0
投票

您的解决方案有几个方面。鉴于这是Python(您可能想重新考虑它),不确定是否建议使用RDD。使用Dataframes会更容易,更高效。

>>> x =  spark.read.option("sep","\t").csv("/data/youtubedata.txt")
>>> x.count()
4100
>>> from pyspark.sql.functions import length
>>> from pyspark.sql.functions import col, size
>>> x.filter(length(col("_c3")) > 3).count()
4066
>>> x.filter(x._c3.isNull()).count()
34
>>> x.filter(x._c3.isNotNull()).count()
4066

更新:已更新计数。

© www.soinside.com 2019 - 2024. All rights reserved.