在pyspark中读取来自不同路径的csv文件即使一条路径不存在也会失败。
Logs = spark.read.load(Logpaths, format="csv", schema=logsSchema, header="true", mode="DROPMALFORMED");
这里Logpaths是一个包含多个路径的数组。并且这些路径是根据给定的startDate和endDate范围动态创建的。如果Logpath包含5个路径,并且前3个存在但第4个不存在。然后整个提取失败。如何在pyspark中避免这种情况,或者如何在阅读之前检查是否存在?
在scala中,我通过检查文件存在并使用hadoop hdfs文件系统globStatus函数过滤掉不存在的记录来完成此操作。
Path = '/bilal/2018.12.16/logs.csv'
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
val fileStatus = fs.globStatus(new org.apache.hadoop.fs.Path(Path));
所以我得到了我想要的东西。就像我在问题中发布的代码一样,可以在scala中用于文件存在检查。在PySpark的情况下,我们可以使用下面的代码。
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path("bilal/logs/log.csv"))
这与scala中使用的代码完全相同,因此在这种情况下,我们使用java库来运行hadoop,并且在运行spark的JVM上运行java代码。