Python 并行异步处理来自 iterable 的 api 调用

问题描述 投票:0回答:1

我正在为 Google Workspace 创建一个自动化脚本,它会获取某个组织单位的直接子级,然后同时获取所有这些 OU 的子级。在网上搜索多处理、线程或异步处理是否最适合我的答案时,我了解到 asyncio 将帮助我解决这个问题。我创建了一个类

Google Tenant
,它保存与 google api 和获取的用户的连接。 但是,我现在的问题是脚本仍然不是异步的,但它按顺序工作而不是异步调用

from google.oauth2 import service_account
from googleapiclient.discovery import build

import logging
import asyncio

class GoogleTenant:
    def __init__(self, api: str, version: str):
        config: ScriptConfig = ScriptConfig()
        credentials = service_account.Credentials.from_service_account_file(config["gcloud"]["keypath"],
                                                                            scopes=SCOPES)
        delegated_credentials = credentials.with_subject(config["gcloud"]["subject"])
        self.service = build(api, version, credentials=delegated_credentials)
        self.users_list = []

    def fetch_users(self):
        users_ous = self.service.orgunits().list(customerId="my_customer", orgUnitPath="/example", type="children", fields="organizationUnits/orgUnitPath").execute()
        ous_list = [ou["orgUnitPath"] for ou in users_ous["organizationUnits"]]
        asyncio.run(self._crawl_ous(ous_list))

    async def _crawl_ous(self, ous: list):
        crawling_result = await asyncio.gather(*[asyncio.create_task(self._fetch_users_from_ou(ou)) for ou in ous])
        for result in crawling_result:
            logging.info(f"Crawling result of ou {result[0]["organizations"][0]["department"]}: {len(result)}")
            self.users_list.extend(result)

    async def _fetch_users_from_ou(self, ou):
        call_parameters = {
            "customer": "my_customer",
            "maxResults": 500,
            "projection": "basic",
            "query": f"orgUnitPath='{str(ou)}'",
            "fields": "users/id,users/name,users/primaryEmail,users/suspended,users/emails,users/organizations/primary,users/organizations/department,users/recoveryEmail,nextPageToken"
        }
        logger.debug(f"Fetching users from {ou}")
        users_from_ou = self.service.users().list(**call_parameters).execute()
        user_fetching_result: list = users_from_ou["users"]
        logger.debug(f"Initial fetch from {ou}: {len(users_from_ou["users"])}")
        if "nextPageToken" in users_from_ou:
            next_page_token = users_from_ou["nextPageToken"]
        else:
            return user_fetching_result
        while True:
            users_from_ou = self.service.users().list(**call_parameters, pageToken=next_page_token).execute()
            logger.debug(f"Next fetch from {ou}: {len(users_from_ou["users"])}")
            user_fetching_result.extend(users_from_ou["users"])
            if "nextPageToken" in users_from_ou:
                next_page_token = users_from_ou["nextPageToken"]
            else:
                return user_fetching_result

if __name__ == '__main__':
    google_tenant = GoogleTenant("admin", "directory_v1")
    google_tenant.fetch_users()

执行结果如下:

DEBUG:root:Fetching users from /example/child1
DEBUG:root:Initial fetch from /example/child1: 500
DEBUG:root:Next fetch from /example/child1: 500
DEBUG:root:Next fetch from /example/child1: 500
DEBUG:root:Next fetch from /example/child1: 258
DEBUG:root:Fetching users from /example/child2
DEBUG:root:Initial fetch from /example/child2: 500
DEBUG:root:Next fetch from /example/child2: 500
DEBUG:root:Next fetch from /example/child2: 500
...

我尝试在某些地方输入await语句,但是我似乎误解了它应该如何根据我的理解await语句使函数在继续函数执行之前等待结果。 我怎样才能让Python同时执行这些?

更新1

我按照@Michael Butscher的建议重新格式化了部分代码,并在前一个块中添加了我的导入

    def fetch_users(self):
        users_ous = self.service.orgunits().list(customerId="my_customer", orgUnitPath="/example", type="children", fields="organizationUnits/orgUnitPath").execute()
        ous_list = [ou["orgUnitPath"] for ou in users_ous["organizationUnits"]]
        logger.debug(f"Fetched and sanitized ous: {pprint.pformat(ous_list)}")
        asyncio.run(self._crawl_ous(ous_list))

    async def _crawl_ous(self, ous: list):
        tasks = [self._crawler_proxy(ou) for ou in ous]
        crawling_result = await asyncio.gather(*tasks)
        for result in crawling_result:
            logger.info(f"Crawling result: {len(result)}")
            self.users_list.extend(result)

    async  def _crawler_proxy(self, *args, **kwargs):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, self._fetch_users_from_ou(*args, **kwargs))
python asynchronous google-api google-api-python-client
1个回答
0
投票

你的罪魁祸首是协程

_fetch_users_from_ou
。它内部没有任何
await
语句,因此这意味着在该函数执行之前,它不允许任何其他任务在其间运行,因此所有任务都在
asyncio.gather
内的
_crawl_ous
调用中按顺序运行。

您希望进行这些更改:

users_from_ou = await asyncio.to_thread( self.service.users().list(**call_parameters).execute)

users_from_ou = await asyncio.to_thread( self.service.users().list(**call_parameters, pageToken=next_page_token).execute)

假设你的 python 版本高于 3.9 以及更多关于它的作用:: https://docs.python.org/3/library/asyncio-task.html#running-in-threads

© www.soinside.com 2019 - 2024. All rights reserved.