我将要读取csv到dataframe1.我创建结构2.加载csv spark.read.option(“ header”,“ false”)。schema(schema).option('delimiter',',')。option('mode','PERMISSIVE')。csv(path1)
如何检查哪些文件/哪些行获取#torefresh和null ...…???
要知道哪些文件包含那些行,您可以使用input_file_name
中的pyspark.sql.functions
函数>
e.g。
df.where("col1 == '#torefresh'").withColumn("file", input_file_name()).show()
这样,您还可以轻松获得每个文件只有一行的汇总
df.where("col1 == '#torefresh'").withColumn("file", input_file_name()).groupBy("file").count().show() +--------------------+-----+ | file|count| +--------------------+-----+ |file:///C:/Users/...| 119| |file:///C:/Users/...| 131| |file:///C:/Users/...| 118| |file:///C:/Users/...| 127| |file:///C:/Users/...| 125| |file:///C:/Users/...| 116| +--------------------+-----+
我不知道一种在原始文件中查找行号的好方法-将CSV加载到DataFrame的那一刻,这些信息几乎丢失了。有一个
row_number
函数,但是它在窗口上工作,因此数字将取决于您定义窗口分区/排序的方式。
如果使用本地文件系统,则可以尝试再次手动读取csv并找到行号,如下所示:
import csv from pyspark.sql.functions import udf from pyspark.sql.types import * @udf(returnType=ArrayType(StringType())) def getMatchingRows(filePath): with open(filePath.replace("file:///", ""), 'r') as file: reader = csv.reader(file) matchingRows = [index for index, line in enumerate(reader) if line[0] == "#torefresh"] return matchingRows withRowNumbers = df.where("col1 == '#torefresh'")\ .withColumn("file", input_file_name())\ .groupBy("file")\ .count()\ .withColumn("rows", getMatchingRows("file")) withRowNumbers.show() +--------------------+-----+--------------------+ | file|count| rows| +--------------------+-----+--------------------+ |file:///C:/Users/...| 119|[1, 2, 4, 5, 6, 1...| |file:///C:/Users/...| 131|[1, 2, 3, 6, 7, 1...| |file:///C:/Users/...| 118|[1, 2, 3, 4, 5, 7...| |file:///C:/Users/...| 127|[1, 2, 3, 4, 5, 7...| |file:///C:/Users/...| 125|[1, 2, 3, 5, 6, 7...| |file:///C:/Users/...| 116|[1, 2, 3, 5, 7, 8...| +--------------------+-----+--------------------+
但是这会非常低效,如果您希望在许多文件中包含这些行,那么它就超过了使用DataFrames的地步。我建议您使用数据源并在创建时启用某种Id,但这当然是可以的,除非您只需要知道该文件包含任何内容即可。
如果除了知道第一个值是“ #torefresh”,您还需要将所有其他值都设置为空,则可以扩展where
过滤器并手动检查udf。