Spark Streaming找到了文件,但却声称找不到文件。

问题描述 投票:-1回答:1

我有下面的东西--它可以监控一个目录& 每X秒拉入一次日志。

我的问题是这样的。

  • 我设置脚本运行
  • 然后我在该目录下创建一个文件(比如说testfile.txt)。
  • 然后脚本出错说textfile.txt不存在。

它找到了文件和文件名,所以它确实存在,而且它找到了。

我可以看到的是,我在路径中定义了一个 file:/// 并返回一个错误,说它找不到 file:/. 所以好像少了两个/不知什么原因。

谢谢你的帮助!!!

编码

#only files after stream starts
df = spark_session\
    .readStream\
    .option('newFilesOnly', 'true')\
    .option('header', 'true')\
    .schema(myschema)\
    .text('file:///home/keenek1/analytics/logs/')\
    .withColumn("FileName", input_file_name())

错误

FileNotFoundException: File file:/home/keenek1/analytics/logs/loggywoggywoo.txt does not exist\
apache-spark pyspark apache-spark-sql spark-streaming
1个回答
1
投票

请将file:/改为hdfs:/。

df = spark_session\
    .readStream\
    .option('newFilesOnly', 'true')\
    .option('header', 'true')\
    .schema(myschema)\
    .text('hdfs://home/keenek1/analytics/logs/')\ # changed file:/// to hdfs://
    .withColumn("FileName", input_file_name())

对于下面的问题 如果同一个日志文件被覆盖,比如说每小时一次,检查点不会重新处理文件。我需要它说 "如果修改的时间改变了,重新处理"--这可能吗?

解决办法 将你的spark流媒体指向不同的目录&使用spark监听器从实际目录检查文件时间戳,如果文件时间戳有任何变化,将该文件移动到你的流媒体目录,并使用新的名称。

如果你想要代码,请告诉我,我可以给你scala的代码,可能你需要把它转换成python。

© www.soinside.com 2019 - 2024. All rights reserved.