Spark:仅当路径存在时才读取文件

问题描述 投票:0回答:6

我正在尝试读取 scala 中路径

Sequence
中存在的文件。以下是示例(伪)代码:

val paths = Seq[String] //Seq of paths
val dataframe = spark.read.parquet(paths: _*)

现在,在上面的序列中,有些路径存在,有些不存在。有没有办法在读取

parquet
文件时忽略丢失的路径(以避免
org.apache.spark.sql.AnalysisException: Path does not exist
)?

我已经尝试了下面的方法,它似乎有效,但是后来,我最终读了两次相同的路径,这是我想避免做的事情:

val filteredPaths = paths.filter(p => Try(spark.read.parquet(p)).isSuccess)

我检查了

options
DataFrameReader
方法,但似乎没有任何类似于
ignore_if_missing
的选项。

此外,这些路径可以是

hdfs
s3
(这个
Seq
作为方法参数传递),在阅读时,我不知道路径是
s3
还是
hdfs
,所以不能使用
s3
hdfs
特定 API 来检查是否存在。

scala apache-spark parquet
6个回答
24
投票

您可以按照@Psidom的回答过滤掉不相关的文件。在 Spark 中,最好的方法是使用 Spark 内部的 hadoop 配置。鉴于 Spark 会话变量称为“spark”,您可以执行以下操作:

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)

def testDirExist(path: String): Boolean = {
  val p = new Path(path)
  hadoopfs.exists(p) && hadoopfs.getFileStatus(p).isDirectory
}
val filteredPaths = paths.filter(p => testDirExists(p))
val dataframe = spark.read.parquet(filteredPaths: _*)

3
投票

先过滤

paths
怎么样`:

paths.filter(f => new java.io.File(f).exists)

例如:

Seq("/tmp", "xx").filter(f => new java.io.File(f).exists)
// res18: List[String] = List(/tmp)

2
投票

从 Spark 2.3.0 开始,有一个配置

spark.sql.files.ignoreMissingFiles
。只需将其设置为
true
即可。

https://spark.apache.org/docs/latest/configuration.html


1
投票

PySpark 3.1 或更高版本

遗憾的是,从 Spark 3.1 开始,pyspark 中还没有任何标志(至少我不知道)可以忽略它们。但你可以尝试这些简单的事情。好消息是加载界面也适用于列表。见下文。

# add you list of paths here
addrs = ["path1", "path2", ...]

# check if they exists, update the list
for add in addrs:
    try:
        spark.read.format("parquet").load(add)
    except:
        print(add)
        addrs.remove(add)

# read the updated list now
sdf_a = spark\
        .read\
        .format("parquet")\
        .load(addrs)

0
投票

@s510 有一个很好的答案,具有 Python 风格的“鸭子打字”风格。然而,我更喜欢尽可能使用不变性,所以我会像这样重写它:

def path_is_readable(x):
  try:
    spark.read.parquet(x)
    return True
  except:
    return False

valid_paths = [p for p in paths if path_is_readable(p)]
dataframe = spark.read.parquet(*valid_paths)

0
投票

此代码可以在 Spark 执行器上运行

FileSystem.get(new Configuration()).exists(new Path(file_path))
© www.soinside.com 2019 - 2024. All rights reserved.