使用本地文件进行 Spark 流式处理(Python)

问题描述 投票:0回答:1

有没有办法扫描本地文件系统以查找特定文件夹中的更改,就像使用 HDFS 一样(GitHub 示例)?使用常规路径或带有

hdfs://
的 URI 运行它似乎可以工作,但使用前面带有
file://
的 URI 则不行。

from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

XML_PATH = "file:///home/user/in"
APP_NAME = "StreamingTest"
BATCH_DURATION = 1 # in seconds

if __name__ == "__main__":
    sc = SparkContext("local[*]", appName=APP_NAME)
    ssc = StreamingContext(sc, BATCH_DURATION)
    lines = ssc.textFileStream(XML_PATH).pprint()
    ssc.start()
    ssc.awaitTermination()

奇怪的是,这似乎适用于特定文件。当

XML_PATH
更改为
"file:///home/user/in/test.txt"
时,如果文件存在,则输出是相同的。

-------------------------------------------
Time: 2016-01-14 16:04:34
-------------------------------------------

-------------------------------------------
Time: 2016-01-14 16:04:35
-------------------------------------------

但是当文件在流式传输期间被删除时,应用程序开始抛出异常

16/01/14 16:04:37 WARN FileInputDStream: Error finding new files
java.io.FileNotFoundException: File file:/home/user/in/test.txt does not exist

我认为应该表明它可以从本地目录读取。

我尝试将

XML_PATH
更改为
/tmp/in
(HDFS 上的目录)并在运行流时上传相同的文件,这似乎有效

-------------------------------------------
Time: 2016-01-14 16:13:12
-------------------------------------------

-------------------------------------------
Time: 2016-01-14 16:13:13
-------------------------------------------
The Project Gutenberg EBook of Ulysses, by James Joyce
subscribe to our email newsletter to hear about new eBooks.

-------------------------------------------
Time: 2016-01-14 16:13:14
-------------------------------------------
python hadoop apache-spark hdfs pyspark
1个回答
0
投票

您需要在代码运行时创建一个文件,然后它才会显示结果。 注意:请勿将文件复制或移动到流目录。

© www.soinside.com 2019 - 2024. All rights reserved.