尝试打开文件并在n秒后重试的最佳实践是什么?
目前,我愿意:
import os
from os import path
import shutil
dir_path = path.abspath(path.join("foo", "bar"))
destination_path = path.abspath(path.join("foo", "dest_dir"))
for f in dir_path:
try:
# try to open the file if the file isn't used any longer
file_opened = open(f, 'r')
# move the file on successful opening
shutil.move(file_opened, destination_path)
file_opened.close()
except IOError:
return False
所以,目前我不处理该异常。我考虑创建额外的功能来打开文件,并在time.sleep(n)
上的excepth上调用该功能。但是,我确定还必须有其他东西...
我不使用
with open(f, 'rb') as file_opened:
do whatever`
编辑:
[一个进程创建了文件,我确定要完成文件写入/创建后,我希望Python进程移动文件。因此,我在上面的代码中添加了shutil.move以显示整个情况。
编辑:
请在下面找到我为解决该问题而开发的代码。我最后写了自己的自定义解决方案来处理它:
import os
from os import path
import psutil
from retry import retry
import shutil
from subprocess import check_output,Popen, PIPE
import glob
import time
class FileHandler:
def __init__(self, fn_source, dir_source):
self.file_source_name = fn_source
self.file_source_path = path.join(dir_source, self.file_source_name)
self.dir_dest_path = path.join(dir_source, "test")
self.file_dest_path = path.join(self.dir_dest_path, self.file_source_name)
def check_file(self):
if os.path.exists(self.file_source_path):
try:
os.rename(self.file_source_path, self.file_source_path)
print("file renamed")
return True
except:
print("can not rename the file..retrying")
time.sleep(1)
self.check_file()
else:
print("source file does not exist...retrying")
time.sleep(5)
self.check_file()
def check_destination(self):
if os.path.exists(self.file_source_path) and not os.path.exists(self.file_dest_path):
return True
elif os.path.exists(self.file_source_path) and os.path.exists(self.file_dest_path):
try:
print(self.file_dest_path, self.file_source_name)
os.remove(self.file_dest_path)
return True
except Exception as e:
print("can not remove the file..retrying")
time.sleep(5)
self.check_destination()
def move_file(self):
if self.check_destination():
print(self.file_source_path)
shutil.move(self.file_source_path, self.file_dest_path)
print("moved", str(self.file_source_path))
return True
else:
print("can not move the file..retrying")
time.sleep(1)
self.move_file()
def file_ops(self):
if self.check_file():
self.move_file()
else:
print("source does not exist")
time.sleep(1)
self.file_ops()
return True
def each_file_ops(fn, dir_source):
fh = FileHandler(fn, dir_source)
return fh.file_ops()
def main(dir_source):
dir_files = glob.glob(path.join(dir_source, '*.txt'))
if dir_files:
[each_file_ops(f, dir_source) for f in dir_files]
else:
print("source dir is empty")
time.sleep(1)
main(dir_source)
if __name__ == '__main__':
main(path.join(""))
您可以使用retry
模块进行此类重试。这使代码看起来更加简洁。 pip install retry
应该安装模块
from retry import retry
import shutil
@retry((FileNotFoundError, IOError), delay=1, backoff=2, max_delay=10, tries=100)
def attempt_to_move_file(fname, dest_path):
# Your attempt to move file
# shutil.move(fname, destination_path)
使用上面的代码,当我们按下attempt_to_move_file
或FileNotFoundError
时,将在重试IOError
时重试(最多100次尝试),并且重试发生在休眠状态1、2、4、8,尝试之间间隔10、10、10 ...秒]
您无需打开文件即可移动它。因为一旦打开文件后文件便处于打开状态,因此无法移动,请像这样在播放器中播放视频并尝试删除它或剪切粘贴系统后将其删除。
import shutil
file_name = 'trytry.csv'
shutil.copyfile(file_name, '/Users/umeshkaushik/PycharmProjects/newtry.csv')
shutil.move(file_name, '/Users/umeshkaushik/PycharmProjects/newtry1.csv')
以上代码运行正常。只要确保您提供正确的输入和输出路径即可。
您可以使用强度,因为重试未更新。