如何加载目录中的某些文件并在 Spark Streaming 中监视该目录中的新文件而不会丢失?

问题描述 投票:0回答:1

我有一个hdfs目录,其中包含许多文件:

/user/root/1.txt
/user/root/2.txt
/user/root/3.txt
/user/root/4.txt

并且有一个守护进程每分钟向该目录添加一个文件。 (例如,5.txt、6.txt、7.txt...)

我想启动一个 Spark Streaming 作业,加载 3.txt、4.txt,然后检测 4.txt 之后的所有新文件。

请注意,由于这些文件很大,处理这些文件将需要很长时间。因此,如果我在启动流任务之前处理 3.txt 和 4.txt,则在处理 3.txt 和 4.txt 期间可能会将 5.txt、6.txt 生成到此目录中。当流任务开始时,5.txt和6.txt将被错过处理,因为它只会从新文件(从7.txt)处理

我不确定我是否描述清楚问题,如果您有任何疑问,请问我

apache-spark spark-streaming
1个回答
1
投票

我找到了解决方案:

根据文档API:https://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.streaming.StreamingContext

def
fileStream[K, V, F <: InputFormat[K, V]](directory: String, filter: (Path) ⇒ Boolean, newFilesOnly: Boolean)(implicit arg0: ClassTag[K], arg1: ClassTag[V], arg2: ClassTag[F]): InputDStream[(K, V)]

创建一个输入流,用于监视 Hadoop 兼容的文件系统中的新文件,并使用给定的键值类型和输入格式读取它们。

我们可以设置过滤功能来过滤文件< 4.txt

然后将“newFilesOnly”设置为 false

© www.soinside.com 2019 - 2024. All rights reserved.