我正在学习Spark流式传输,但遇到了可能很简单的问题。我想从目录中提取整个文本文件。通常在这里提到的方法是wholeTextFile,而不是将文件按行分割的textFile。但是,据我发现,该方法在流上下文中不可用。
如何简单地实现类似的效果-流式传输时获取(文件名,整个文件内容)?
带有流上下文并且也具有sparksession的Scala示例将很棒。
我也在流上下文中搜索了wholeTextFile
,但在官方API中找不到任何内容。
尽管,我遇到了私有的WholeTextFileInputFormat
类,该类可以与fileStream
一起用于在(file path, file content)
元组上进行流式传输。但是,由于此类是私有的,因此不能直接使用。我的解决方案可能有点笨拙:
WholeTextFileInputFormat.scala
和WholeTextFileInputFormat.scala
从WholeTextFileRecordReader.scala
复制到您的项目中WholeTextFileRecordReader.scala
格式化程序用Apache Spark repository创建流这里是Scala中的一个示例,假设fileStream
是您的WholeTextFileInputFormat
。
ssc
Well OP自2017年以来可能再也没有问题了,但是我实际上看起来像这样,当我找到解决方案时,我将放弃,Spark 3将采用一种可以使用的格式。实现这一确切的目标。
StreamingContext
我的实现看起来与此类似
import org.apache.hadoop.io.Text
val directory = "/the/directory/to/watch"
val stream = ssc.fileStream[Text, Text, WholeTextFileInputFormat](directory)
这种方法对我有用,内容对象包含文件的实际内容,从那里您可以简单地将其转换为所需的任何最终对象。