我有一个脚本作为flask命令,我想在系统启动时运行它直到系统结束。基本上应该是一个线程。我已经创建了该命令并且运行良好。如何才能与系统同时运行呢? Flask_commands.py:
@click.command()
def file_watcher() -> None:
context.interactor_type = 'flask command'
event_handler = Handler()
folder_path = core.settings['folder_path']
observer = Observer()
observer.schedule(event_handler, folder_path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
file_watcher.py
class Handler(FileSystemEventHandler):
def on_created(self, event):
logging.log(logging.INFO, f'New file created: {event.src_path}')
print(f'New file created: {event.src_path}')
...
print(datamodel)
应用程序.py
from ...
app = create_app("pb")
init_flask_cmds(app)
if __name__ == '__main__':
port = core.settings.get('listening_port', 5000)
app.run(
debug=True,
host="0.0.0.0",
port=port
)
我正在像这样运行应用程序:
flask run
您可以在您的
app.py
中使用线程库和类似的东西
import threading
from ...
from flask_commands import file_watcher
app = create_app("pb")
init_flask_cmds(app)
if __name__ == '__main__':
port = core.settings.get('listening_port', 5000)
file_watcher_thread = threading.Thread(target=file_watcher, daemon=True)
file_watcher_thread.run()
app.run(
debug=True,
host="0.0.0.0",
port=port
)
file_watcher_thread.join(1)
如果在 Linux 上,您还可以使用信号库将键盘中断发送到线程并使其终止,在 Windows 上信号可能会出现问题。但是,如果您不打算将该函数用作独立的函数,您也可以实现一个 threading.Event 来终止线程,如此处所示
另一个好方法是将清理添加到 Flask 应用程序的拆卸上下文中。您使用 this function 注册一个在应用程序上下文被销毁时运行的方法,以确保您的线程停止。但我自己从来没有这样做过。