如何在Python中使用特定方法创建可等待的对象

问题描述 投票:0回答:1

有一个基于 aiohttp 的服务器,看起来像:

import aiohttp
routes = web.RouteTableDef()
@routes.post('/route1')
async def route1(request):
    req_json = await request.json()
    step1 = func1(req_json)
    step2 = func2(step1)
    return web.Response(body=step2, status=200)

我想在本地测试该服务器而不运行整个服务器,特别是route1中的step1和step2函数。

所以我这样做:

import asyncio
from myserver import route1
json_request = {"some": "data"}
loop = asyncio.get_event_loop()
loop.run_until_complete(route1(json_request))

问题是,运行本地测试时我必须将

req_json = await request.json()
替换为
req_json = request
。 如何使用 .json() 方法创建可等待变量来代替
json_request
dict?

python-3.x aiohttp
1个回答
0
投票

要使这一行正常工作:

await request.json()
,您实际上并不需要等待
request
对象。它可以是一个具有
.json()
属性的普通对象,该属性 “is” 是一个可等待的对象。协程将是一个合理的选择。

像这样:

import asyncio
from myserver import route1

class CustomRequest:
    def __init__(self, data: dict) -> None:
        self.data = data

    async def json(self) -> dict:
        return self.data
        
json_request = {"some": "data"}
request_object = CustomRequest(json_request)
asyncio.run(route1(request_object))
© www.soinside.com 2019 - 2024. All rights reserved.