我有一个hdfs目录,其中包含许多文件:
/user/root/1.txt
/user/root/2.txt
/user/root/3.txt
/user/root/4.txt
并且有一个守护进程每分钟向该目录添加一个文件。 (例如,5.txt、6.txt、7.txt...)
我想启动一个 Spark Streaming 作业,加载 3.txt、4.txt,然后检测 4.txt 之后的所有新文件。
请注意,由于这些文件很大,处理这些文件将需要很长时间。因此,如果我在启动流任务之前处理 3.txt 和 4.txt,则在处理 3.txt 和 4.txt 期间可能会将 5.txt、6.txt 生成到此目录中。当流任务开始时,5.txt和6.txt将被错过处理,因为它只会从新文件(从7.txt)处理
我不确定我是否描述清楚问题,如果您有任何疑问,请问我
我找到了解决方案:
根据文档API:https://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.streaming.StreamingContext
def
fileStream[K, V, F <: InputFormat[K, V]](directory: String, filter: (Path) ⇒ Boolean, newFilesOnly: Boolean)(implicit arg0: ClassTag[K], arg1: ClassTag[V], arg2: ClassTag[F]): InputDStream[(K, V)]
创建一个输入流,用于监视 Hadoop 兼容的文件系统中的新文件,并使用给定的键值类型和输入格式读取它们。
我们可以设置过滤功能来过滤文件< 4.txt
然后将“newFilesOnly”设置为 false