Spark结构化流和批处理是否相同?

问题描述 投票:0回答:1

我有两个火花工作。一个是批处理作业,另一个是结构化的流作业。两者都写入相同的文件接收器。两者具有相同的架构。但是,从此接收器读取数据时,spark仅读取由流作业创建的文件,而跳过由批处理作业创建的文件。我可以在文件接收器文件夹中看到目录_spark_metadata。当我删除该文件夹时,spark开始读取所有文件。但是,这将永远不可能,因为在下一个微型批处理中,spark将在此处创建另一个_spark_metadata文件夹。如何从Spark中读取此接收器中的所有文件。

apache-spark hadoop pyspark apache-spark-sql spark-structured-streaming
1个回答
0
投票

我有同样的问题,如果您从流目录中读取数据,有时您可能会遇到以下问题。我使用下面的代码来解决问题,并且对我有用。可能在下面的代码将帮助您。

java.IO.FileNotFoundException ... The underlying files may have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

而不是在加载数据时指向HDFS目录并获取所需的文件路径并将这些路径传递给spark load方法。

在下面的代码中,您可以更好地控制要读取和忽略的文件。

import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}

implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
    case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
      override def hasNext: Boolean = remoteIterator.hasNext
      override def next(): T = remoteIterator.next()
    }
    wrapper(remoteIterator)
}

def listFiles(spark: SparkSession,path: String) = {
    FileSystem.get(spark.sparkContext.hadoopConfiguration)    
    .listFiles(new Path(path),true)
    .toList.map(_.getPath)
    .filter(!_.toString.contains("_spark_metadata"))
    .map(_.toString)
}

val files = listFiles(spark,kafka.read.hdfsLocation)
require(files.isEmpty, s"Files are not available to process data.")
spark
    .read
    .format(read.format)
    .options(read.options)
    .load(files:_*)

© www.soinside.com 2019 - 2024. All rights reserved.