我想根据行的长度使用:Pyspark shell过滤掉从数据集创建的RDD
我的数据文件如下所示
> fzDTn342L3Q djjohnnykey 599 Music 185 1005 3.67 3 1 KDrJSNIGNDQ MacjQFNVLlQ oZ6f2vaH858 fYSjMDNa4S8 JUatPzf_eSc QfBFl7kU35c rG-rQ-YGdSA kOq6sFmoUr0 IRj1IABVBis AVsZ0VH3eN4 r1pS_4qouUc YgaNW1KRgK4 ZlGdVR7mBy4 nKFLE3DX4OQ EtQjN6CQeCc afe-0VY4YiI ekV5NseEdy8 IQs6CrER5fY jTLcoIxMI-E yfvW1ITcMpM
>
> kOq6sFmoUr0 djjohnnykey 599 Music 113 992 0 0 1 MacjQFNVLlQ fYSjMDNa4S8 4vso1y_-cvk 8BwAX6YBx3E QeUQyf8H7vM jmc21-Nhewg hZUU2-UBaGk SaLaotssH0w PUlcrBaYpwI tjIK2xop4L0 BNlL15OYnFY _pzP7OLInjk 4daGJ6TMcp4 _8jM9R-1yRk KDrJSNIGNDQ oZ6f2vaH858 JUatPzf_eSc QfBFl7kU35c rG-rQ-YGdSA fzDTn342L3Q
这里第4列是类别。数据文件中的某些行不包含此字段,因此长度较短。这促使我根据此标准过滤掉数据集,并进一步在具有类别的数据集上形成RDD。
我试图从数据集创建初始RDD。
>>> data="/Users/sk/Documents/BigData/0222/0.txt"
>>> input = sc.textFile(data)
现在我按选项卡拆分并保存在RDDS行中
>>> lines = input.map(lambda x: (str(x.split('\t'))))
在此之后我想过滤掉长度小于3的线。
>>> data="/Users/sk/Documents/BigData/0222/1.txt"
>>> input = sc.textFile(data)
>>> lines = input.map(lambda x: (str(x.split('\t'))))
>>> lines.count()
3169
>>> newinput=input.filter(lambda x: len(x)>3)
>>> newinput.count()
3169
在此之后它不会改变我的rdd中的任何内容。任何人都可以帮忙。
您的解决方案有几个方面。鉴于这是Python(您可能想重新考虑它),不确定是否建议使用RDD。使用Dataframes会更容易,更高效。
>>> x = spark.read.option("sep","\t").csv("/data/youtubedata.txt")
>>> x.count()
4100
>>> from pyspark.sql.functions import length
>>> from pyspark.sql.functions import col, size
>>> x.filter(length(col("_c3")) > 3).count()
4066
>>> x.filter(x._c3.isNull()).count()
34
>>> x.filter(x._c3.isNotNull()).count()
4066
更新:已更新计数。