我有两个火花工作。一个是批处理作业,另一个是结构化的流作业。两者都写入相同的文件接收器。两者具有相同的架构。但是,从此接收器读取数据时,spark仅读取由流作业创建的文件,而跳过由批处理作业创建的文件。我可以在文件接收器文件夹中看到目录_spark_metadata。当我删除该文件夹时,spark开始读取所有文件。但是,这将永远不可能,因为在下一个微型批处理中,spark将在此处创建另一个_spark_metadata文件夹。如何从Spark中读取此接收器中的所有文件。
我有同样的问题,如果您从流目录中读取数据,有时您可能会遇到以下问题。我使用下面的代码来解决问题,并且对我有用。可能在下面的代码将帮助您。
java.IO.FileNotFoundException ... The underlying files may have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
而不是在加载数据时指向HDFS目录并获取所需的文件路径并将这些路径传递给spark load
方法。
在下面的代码中,您可以更好地控制要读取和忽略的文件。
import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
override def hasNext: Boolean = remoteIterator.hasNext
override def next(): T = remoteIterator.next()
}
wrapper(remoteIterator)
}
def listFiles(spark: SparkSession,path: String) = {
FileSystem.get(spark.sparkContext.hadoopConfiguration)
.listFiles(new Path(path),true)
.toList.map(_.getPath)
.filter(!_.toString.contains("_spark_metadata"))
.map(_.toString)
}
val files = listFiles(spark,kafka.read.hdfsLocation)
require(files.isEmpty, s"Files are not available to process data.")
spark
.read
.format(read.format)
.options(read.options)
.load(files:_*)