from aiohttp import web
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)
app = web.Application()
app.add_routes([web.get('/', handle),
web.get('/{name}', handle)])
if __name__ == '__main__':
web.run_app(app)
上面的例子来自官方文档。
问题是如何在没有
web.run_app(app)
(Ctrl+C)的情况下终止 KeyBoardInterrupt
操作?
我看起来像:
await app.shutdown()
await app.cleanup()
但我不知道我可以把这个代码放在哪里以及如何使用它。
从源来看,它看起来是对
KeyboardInterrupt
和GracefulExit
的响应:
try:
asyncio.set_event_loop(loop)
loop.run_until_complete(main_task)
except (GracefulExit, KeyboardInterrupt): # pragma: no cover
pass
finally:
_cancel_tasks({main_task}, loop)
_cancel_tasks(asyncio.all_tasks(loop), loop)
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
asyncio.set_event_loop(None)
由于您不想在手动中停止,因此您可以在自己的代码中引发
GracefulExit
,如下所示:
测试.py:
import signal
from aiohttp import web
from aiohttp.web_runner import GracefulExit
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)
async def shutdown(request):
print("will shutdown now")
raise GracefulExit()
app = web.Application()
app.add_routes([web.get('/shutdown', shutdown),
web.get('/', handle),
web.get('/{name}', handle)])
if __name__ == '__main__':
web.run_app(app)
一旦你想关闭服务器,你可以发送
http://ip:8080/shutdown
到服务器。
另外,如果你想在收到请求后直接退出服务器,你也可以使用
signal.alarm(3)
,那么你不需要用/shutdown
定义处理程序。这意味着在收到任何请求后 3 秒后向程序发送警报(aiohttp 内部已经使用 loop.add_signal_handler(signal.SIGINT, _raise_graceful_exit)
注册了一个信号处理程序):
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
signal.alarm(3)
return web.Response(text=text)
不管怎样,关闭服务器的方式是
raise GracefulExit
或者send signal
,但是退出服务器的时机取决于你的场景。
根据@atline的回答我写了这段代码:
import signal
from aiohttp import web
from aiohttp.web_runner import GracefulExit
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from multiprocessing import cpu_count
import requests
import time
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)
async def shutdown(request):
print("will shutdown now")
raise GracefulExit()
app = web.Application()
app.add_routes([web.get('/shutdown', shutdown),
web.get('/', handle),
web.get('/{name}', handle)])
def init_aiohttp():
web.run_app(app, host="127.0.0.1", port="8080", ssl_context=None)
if __name__ == '__main__':
executor = ProcessPoolExecutor()
for i in range(0, 1):
aiohttp = executor.submit(init_aiohttp)
total_time = 0
while(True):
time.sleep(1)
total_time += 1
if total_time == 5:
try:
requests.get('http://127.0.0.1:8080/shutdown')
except Exception as e:
print(e)
print(total_time)
以上代码:
aiohttp
服务器。