我正在使用 FastAPI 在 python 中创建一个服务器,我想要一个与我的 API 无关的函数,每 5 分钟在后台运行一次(比如检查来自 API 的内容并根据响应打印内容)
我试图创建一个线程来运行函数
start_worker
,但它没有打印任何东西。
有人知道怎么做吗?
def start_worker():
print('[main]: starting worker...')
my_worker = worker.Worker()
my_worker.working_loop() # this function prints "hello" every 5 seconds
if __name__ == '__main__':
print('[main]: starting...')
uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
_worker_thread = Thread(target=start_worker, daemon=False)
_worker_thread.start()
你应该在调用
uvicorn.run
之前启动你的线程,因为 uvicorn.run
正在阻塞线程。
import time
import threading
from fastapi import FastAPI
import uvicorn
app = FastAPI()
class BackgroundTasks(threading.Thread):
def run(self,*args,**kwargs):
while True:
print('Hello')
time.sleep(5)
if __name__ == '__main__':
t = BackgroundTasks()
t.start()
uvicorn.run(app, host="0.0.0.0", port=8000)
您也可以使用 FastAPI 的 startup 事件 启动您的线程,只要它可以在应用程序启动之前运行即可。
@app.on_event("startup")
async def startup_event():
t = BackgroundTasks()
t.start()
您可以为后台任务使用重复的Event scheduler,如下所示:
import sched, time
from threading import Thread
from fastapi import FastAPI
import uvicorn
app = FastAPI()
s = sched.scheduler(time.time, time.sleep)
def print_event(sc):
print("Hello")
sc.enter(5, 1, print_event, (sc,))
def start_scheduler():
s.enter(5, 1, print_event, (s,))
s.run()
@app.on_event("startup")
async def startup_event():
thread = Thread(target = start_scheduler)
thread.start()
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=8000)
如果您的任务是
async def
函数(有关 FastAPI 中 def
与 async def
端点/后台任务的更多详细信息,请参见这个答案),那么您可以将任务添加到当前事件循环,使用
asyncio.create_task()
功能。 create_task()
函数接受一个协程对象(即一个 async def
函数)并返回一个 Task
对象(如果需要,它可以用于 await
任务,或者 cancel
it 等) .该调用在事件循环内为当前线程创建任务,并在“后台”与事件循环中的所有其他任务同时执行,在 await
点之间切换。
需要在调用
create_task()
之前创建一个事件循环,并且在以编程方式(例如,使用 uvicorn
)或在终端中(例如,使用, uvicorn.run(app)
)。除了使用 uvicorn app:app
,还可以使用 asyncio.create_task()
获取当前事件
asyncio.get_running_loop()
,然后调用 loop
。下面的示例使用最近记录的方式添加
loop.create_task()
事件(使用上下文管理器),即应在应用程序启动之前以及应用程序关闭时执行的代码(参见
documentation更多细节)。还可以使用
lifespan
和 startup
事件,如前面的选项所示;但是,这些事件处理程序可能会从未来的 FastAPI/Starlette 版本中删除。shutdown
但是,如果你想改进它并删除多余的
from fastapi import FastAPI
from contextlib import asynccontextmanager
import asyncio
async def print_task(s):
while True:
print('Hello')
await asyncio.sleep(s)
@asynccontextmanager
async def lifespan(app: FastAPI):
asyncio.create_task(print_task(5))
yield
app = FastAPI(lifespan=lifespan)
方法,你只需要将
start_scheduler
参数直接传递给带有sc
参数的print_event
方法,如下所示:kwargs