异步发送 HTTP POST,但立即向服务器发出请求

问题描述 投票:0回答:1

我正在尝试使用 asyncio 和 aiohttp 库异步发送 HTTP POST,但我想立即向服务器发出请求。在下面的代码中,生产者和消费者通过队列不断进行通信。当消费者遇到满足特定条件(在本例中为偶数条件)的数据时,它会尝试向服务器发送 POST 请求。

代码中有两个方法:#METHOD 1 和 #METHOD 2。但是,两者都存在一些问题。使用 # 方法 1,功能可以正常工作,但存在一个问题,即消费者在收到 POST 结果之前会被阻止(生产者没有被阻止,但消费者被阻止)。我想要的是消费者继续独立处理生产者产生的数据,即使到服务器的 POST 正在等待。

所以,我尝试了#方法2,但这里也有一个问题。 # 方法2 使用create_task 创建一个独立的任务来处理POST 请求。问题是create_task只调度事件循环中的任务;它不保证立即执行。

总之,我想立即执行向服务器发送POST请求(只需发送),但使用create_task异步处理服务器对POST响应的验证。如果 aiohttp 库无法做到这一点,我可能需要探索其他库。有办法实现我想要的吗?

import asyncio
import aiohttp
import random


async def my_send_post(session, url, headers, params):
    resp = await session.post(url=url, headers=headers, json=params)
    resp = await resp.json()
    print(resp)
    return


async def producer(queue):
    x = 0
    while True:
        # produce an item
        print(f'producing {x}')
        # simulate i/o operation using sleep
        await asyncio.sleep(1)
        # put the item in the queue
        await queue.put(x)
        x += 1


async def consumer(queue):
    session = aiohttp.ClientSession()
    url = 'https://api.testurl.com/'
    params = {}
    headers = request_headers(params)     # just get headers using other library
    while True:
        # wait for an item from the producer
        item = await queue.get()

        if item %2 == 0:
            # METHOD 1
            resp = await session.post(url=url, headers=headers, json=params)
            resp = await resp.json()
            print(resp)

            # METHOD 2
            asyncio.create_task(my_send_post(session=session, url=url, headers=headers, params=params))

        else:
            print(f"consuming odd number{item}")


async def main():
    queue = asyncio.Queue()
    await asyncio.gather(producer(queue), consumer(queue))


if __name__ == "__main__":
    asyncio.run(main())`
post get python-asyncio aiohttp
1个回答
0
投票

问题是create_task只调度事件循环中的任务;它不保证立即执行。

没有任何方法可以保证这一点,除非您知道应用程序中的所有变量,但您应该能够足够接近。

首先要了解的是,create_task() 安排任务在循环的下一次迭代中运行(它显然不会立即执行任何内容,因为没有

await
以便屈服于事件循环)。

接下来要理解的是,

asyncio.sleep(0)
可用于让出事件循环进行 1 次迭代。

那么这只是当前正在运行哪些任务的问题。如果只有 2 个任务正在运行,那么我们知道在屈服于事件循环后,新任务将是下一个运行的任务,因此我们可以确定它会立即执行。

如果还有其他任务在运行,那么可能需要更长的时间(即它需要完成当前迭代中任何剩余任务的一个步骤,然后是在它之前安排的下一次迭代中的任何任务。但是,我们知道它将在当前任务恢复之前运行)。

这是一个非常详细的解释,但基本上,你可能只想这样做:

t = asyncio.create_task(...)
await asyncio.sleep(0)

此外,顺便说一句,您应该在某些时候保留对任务的引用并对其进行

await
,否则您可能会丢失异常(在使用调试模式时可能会收到有关此问题的警告(
python -X dev
))。如果您想轻松地将任务扔到后台,我建议使用 aiojobs (尽管 asyncio 本身可能很快就会有一些东西来处理这个问题)。

© www.soinside.com 2019 - 2024. All rights reserved.