如何在 Flask 中运行应用程序的同时运行命令?

问题描述 投票:0回答:1

我有一个脚本作为flask命令,我想在系统启动时运行它直到系统结束。基本上应该是一个线程。我已经创建了该命令并且运行良好。如何才能与系统同时运行呢? Flask_commands.py:

@click.command()
def file_watcher() -> None:
    context.interactor_type = 'flask command'

    event_handler = Handler()
    folder_path = core.settings['folder_path']
    observer = Observer()
    observer.schedule(event_handler, folder_path, recursive=True)

    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

file_watcher.py

class Handler(FileSystemEventHandler):
    def on_created(self, event):
        logging.log(logging.INFO, f'New file created: {event.src_path}')
        print(f'New file created: {event.src_path}')
        ...
        print(datamodel)

应用程序.py

from ...


app = create_app("pb")

init_flask_cmds(app)
if __name__ == '__main__':
    port = core.settings.get('listening_port', 5000)
    app.run(
        debug=True,
        host="0.0.0.0",
        port=port
    )

我正在像这样运行应用程序:

flask run

python python-3.x flask watchdog
1个回答
0
投票

您可以在您的

app.py

中使用线程库和类似的东西
import threading
from ...
from flask_commands import file_watcher

app = create_app("pb")

init_flask_cmds(app)
if __name__ == '__main__':
    port = core.settings.get('listening_port', 5000)
    file_watcher_thread = threading.Thread(target=file_watcher, daemon=True)
    file_watcher_thread.run()
    app.run(
        debug=True,
        host="0.0.0.0",
        port=port
    )
    file_watcher_thread.join(1)

如果在 Linux 上,您还可以使用信号库将键盘中断发送到线程并使其终止,在 Windows 上信号可能会出现问题。但是,如果您不打算将该函数用作独立的函数,您也可以实现一个 threading.Event 来终止线程,如此处所示

另一个好方法是将清理添加到 Flask 应用程序的拆卸上下文中。您使用 this function 注册一个在应用程序上下文被销毁时运行的方法,以确保您的线程停止。但我自己从来没有这样做过。

© www.soinside.com 2019 - 2024. All rights reserved.