当我使用 fastapi_utilities 中的 Repeat_at 时,它显示“预期的协程,没有”

问题描述 投票:0回答:1

我正在编写一个 fastapi 应用程序,希望以分钟和每小时为单位进行安排,但是当我运行它时,它会显示:

\base_events.py", line 436, in create_task
    task = tasks.Task(coro, loop=self, name=name, context=context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: a coroutine was expected, got None

我的代码:

import requests
from fastapi_utilities import repeat_at
from fastapi_offline import FastAPIOffline
import asyncio

app = FastAPIOffline()

task = None

@repeat_at(cron="*/1 * * * *")  # every 1 minute
async def call_api():
    global task
    url = "http://sssssss/both/execute_query/4"
    try:
        loop = asyncio.get_event_loop()
        response = await loop.run_in_executor(None, lambda: requests.get(url))
        response_json = response.json()
        print(response_json)
    except requests.exceptions.RequestException as e:
        print(f"Error fetching data: {e}")

    if task is None:
        while True:
            #print("Running task...")
            await asyncio.sleep(0)  # This is a checkpoint where the task can be cancelled
            break
    return response_json

@app.get("/trigger-call-api")
async def trigger_call_api():
    global task
    if task is None:
        print('*************', call_api(), call_api)
        task = asyncio.create_task(call_api())
        return {"message": "call_api triggered"}
    else:
        return {"message": "call_api is already running"}
        

@app.get("/stop-call-api")
async def stop_call_api():
    global task
    if task is not None:
        #print('me@',dir(task))
        task.cancel()
        task = None
        #print('meXC',task, type(task), task.cancel(), task.cancelled(), task.cancelling())
    else:
        return {"message": "call_api is not running"}


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="localhost", port=8001)

我正在弄清楚

call_api
None
并尝试修复它,但没有任何努力。

python cron fastapi coroutine schedule
1个回答
0
投票

根据@repeat_at

装饰器
的源代码,它不支持装饰后直接调用async def
函数,因为它返回的是普通的
def
函数。

您可以通过在

call_api

 周围添加一个包装器(用 
@repeat_at
 装饰)来解决此问题,这样您就可以继续正常使用未装饰的 
call_api

@repeat_at(cron="*/1 * * * *") # every 1 minute async def call_api_repeat(): await call_api() async def call_api(): global task url = "http://sssssss/both/execute_query/4" try: loop = asyncio.get_event_loop() response = await loop.run_in_executor(None, lambda: requests.get(url)) response_json = response.json() print(response_json) except requests.exceptions.RequestException as e: print(f"Error fetching data: {e}") if task is None: while True: #print("Running task...") await asyncio.sleep(0) # This is a checkpoint where the task can be cancelled break return response_json
    
© www.soinside.com 2019 - 2024. All rights reserved.