使用aiohttp的Python 3.6异步GET请求正在同步运行

问题描述 投票:4回答:2

我有以下功能正常,但由于某种原因,请求似乎是同步执行,而不是异步。

我现在的假设是,由于主函数中的for record in records for循环,这种情况正在发生,但我不知道如何更改这个以便请求可以执行异步。如果不是这样,我还需要改变什么呢?

async def do_request(query_string):
        base_url = 'https://maps.googleapis.com/maps/api/place/textsearch/json?'
        params = {'key': google_api_key,
                  'query': query_string}
        async with aiohttp.ClientSession() as session:
            async with session.request('GET', base_url, params=params) as resp:
                return resp


async def main():
    create_database_and_tables()
    records = prep_sample_data()[:100]

    for record in records:
        r = Record(record)

        if not r.is_valid:
            continue

        query_string = r.generate_query_string()

        resp = await do_request(query_string)
        print("NOW WRITE TO DATABASE")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
python python-3.x http python-asyncio aiohttp
2个回答
6
投票

您正在等待单独的do_request()呼叫。而不是直接等待它们(在协程完成之前阻塞它们),使用asyncio.gather() function让事件循环同时运行它们:

async def main():
    create_database_and_tables()
    records = prep_sample_data()[:100]

    requests = []
    for record in records:
        r = Record(record)

        if not r.is_valid:
            continue

        query_string = r.generate_query_string()

        requests.append(do_request(query_string))

    for resp in asyncio.gather(*requests):
        print("NOW WRITE TO DATABASE")

asyncio.gather()返回值是协同程序返回的所有结果的列表,与您将它们传递给gather()函数的顺序相同。

如果您需要原始记录来处理响应,您可以通过几种不同的方式配对记录和查询字符串:

  • 将有效记录存储在单独的列表中,并在处理响应时使用zip()将它们再次配对
  • 使用一个帮助程序,它获取有效记录,生成一个查询字符串,调用请求,并将记录和响应一起作为元组返回。

您还可以将响应处理混合到聚集的协程中;一个记录,产生查询字符串,等待do_request,然后在响应准备好时将结果存储在数据库中。

换句话说,将你需要连续发生的工作分成协同程序并收集它们。


2
投票

建立起来或Martijn的答案

如果请求的顺序对您来说无关紧要(当它写入数据库时​​),您可以在获取命令时将响应写入数据库。

编辑(解释更多):我在这里使用2个信号量。 1是通过aiohttp限制连接数。这取决于您的系统。大多数Linux系统默认为1024.根据我自己的个人经验,将其设置为低于OS max是更可取的。

max_coroutines是为了解决有太多协同程序同时运行的问题。

我使用asyncio.ensure_future(),以便在构建列表时运行协同程序。这样,在执行任何协同程序之前,您不会创建完整的协程列表。

# Limit the total number of requests you make by 512 open connections.
max_request_semaphore = asyncio.BoundedSemaphore(512)
max_coroutines = asyncio.BoundedSemaphore(10000)


async def process_response(response):
    print('Process your response to your database')


async def do_request(query_string):
    base_url = 'https://maps.googleapis.com/maps/api/place/textsearch/json?'
    params = {'key': google_api_key,
              'query': query_string}
    async with max_request_semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.request('GET', base_url, params=params) as resp:
                return resp


# Excuse me for the bad function naming
async do_full_request(query_string):
    resp = await do_request(query_string)
    await process_response(resp)
    max_coroutines.release()

async def main():
    create_database_and_tables()
    records = prep_sample_data()[:100]

    requests = []
    for record in records:
        r = Record(record)

        if not r.is_valid:
            continue

        query_string = r.generate_query_string()

        # Will prevent more than 10k coroutines created.
        await max_coroutines.acquire()
        requests.append(
            asyncio.ensure_future(
                do_full_request(query_string)))

    # Now gather all the coroutines
    await asyncio.gather(*requests)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
© www.soinside.com 2019 - 2024. All rights reserved.