我有下面的东西--它可以监控一个目录& 每X秒拉入一次日志。
我的问题是这样的。
它找到了文件和文件名,所以它确实存在,而且它找到了。
我可以看到的是,我在路径中定义了一个 file:///
并返回一个错误,说它找不到 file:/
. 所以好像少了两个/不知什么原因。
谢谢你的帮助!!!
编码
#only files after stream starts
df = spark_session\
.readStream\
.option('newFilesOnly', 'true')\
.option('header', 'true')\
.schema(myschema)\
.text('file:///home/keenek1/analytics/logs/')\
.withColumn("FileName", input_file_name())
错误
FileNotFoundException: File file:/home/keenek1/analytics/logs/loggywoggywoo.txt does not exist\
请将file:/改为hdfs:/。
df = spark_session\
.readStream\
.option('newFilesOnly', 'true')\
.option('header', 'true')\
.schema(myschema)\
.text('hdfs://home/keenek1/analytics/logs/')\ # changed file:/// to hdfs://
.withColumn("FileName", input_file_name())
对于下面的问题 如果同一个日志文件被覆盖,比如说每小时一次,检查点不会重新处理文件。我需要它说 "如果修改的时间改变了,重新处理"--这可能吗?
解决办法 将你的spark流媒体指向不同的目录&使用spark监听器从实际目录检查文件时间戳,如果文件时间戳有任何变化,将该文件移动到你的流媒体目录,并使用新的名称。
如果你想要代码,请告诉我,我可以给你scala的代码,可能你需要把它转换成python。