Databricks spark.read csv具有行#torefresh

问题描述 投票:0回答:1

我将要读取csv到dataframe1.我创建结构2.加载csv spark.read.option(“ header”,“ false”)。schema(schema).option('delimiter',',')。option('mode','PERMISSIVE')。csv(path1) enter image description here

如何检查哪些文件/哪些行获取#torefresh和null ...…???

dataframe apache-spark databricks
1个回答
0
投票

要知道哪些文件包含那些行,您可以使用input_file_name中的pyspark.sql.functions函数>

e.g。

df.where("col1 == '#torefresh'").withColumn("file", input_file_name()).show()

这样,您还可以轻松获得每个文件只有一行的汇总

df.where("col1 == '#torefresh'").withColumn("file", input_file_name()).groupBy("file").count().show()

+--------------------+-----+
|                file|count|
+--------------------+-----+
|file:///C:/Users/...|  119|
|file:///C:/Users/...|  131|
|file:///C:/Users/...|  118|
|file:///C:/Users/...|  127|
|file:///C:/Users/...|  125|
|file:///C:/Users/...|  116|
+--------------------+-----+

我不知道一种在原始文件中查找行号的好方法-将CSV加载到DataFrame的那一刻,这些信息几乎丢失了。有一个row_number函数,但是它在窗口上工作,因此数字将取决于您定义窗口分区/排序的方式。

如果使用本地文件系统,则可以尝试再次手动读取csv并找到行号,如下所示:

import csv
from pyspark.sql.functions import udf
from pyspark.sql.types import *

@udf(returnType=ArrayType(StringType()))
def getMatchingRows(filePath):
    with open(filePath.replace("file:///", ""), 'r') as file:
      reader = csv.reader(file)
      matchingRows = [index for index, line in enumerate(reader) if line[0] == "#torefresh"]
      return matchingRows

withRowNumbers = df.where("col1 == '#torefresh'")\
    .withColumn("file", input_file_name())\
    .groupBy("file")\
    .count()\
    .withColumn("rows", getMatchingRows("file"))
withRowNumbers.show()

+--------------------+-----+--------------------+
|                file|count|                rows|
+--------------------+-----+--------------------+
|file:///C:/Users/...|  119|[1, 2, 4, 5, 6, 1...|
|file:///C:/Users/...|  131|[1, 2, 3, 6, 7, 1...|
|file:///C:/Users/...|  118|[1, 2, 3, 4, 5, 7...|
|file:///C:/Users/...|  127|[1, 2, 3, 4, 5, 7...|
|file:///C:/Users/...|  125|[1, 2, 3, 5, 6, 7...|
|file:///C:/Users/...|  116|[1, 2, 3, 5, 7, 8...|
+--------------------+-----+--------------------+

但是这会非常低效,如果您希望在许多文件中包含这些行,那么它就超过了使用DataFrames的地步。我建议您使用数据源并在创建时启用某种Id,但这当然是可以的,除非您只需要知道该文件包含任何内容即可。

如果除了知道第一个值是“ #torefresh”,您还需要将所有其他值都设置为空,则可以扩展where过滤器并手动检查udf。

© www.soinside.com 2019 - 2024. All rights reserved.