并发 Future 与 Asyncio 的区别

问题描述 投票:0回答:1

我尝试通过异步方式优化 Python 代码。为此,我尝试了 asyncio 和并发.futures 库。

这是我的代码:

async def get_rds_instances(session, region, engine_types):
    report_rds = []
    mandatory_tags = {'Use-Case'}

    client = session.client('rds', region_name=region)
        try:
            await asyncio.sleep(1)
            response = client.describe_db_instances()
            rds_report.append(response)
        except (ClientError, Exception) as e:
            print(e)
    return reportd_rds

async def main():
... some arguments definition
    session = get_rds_session(profile_name)
    regions = session.get_available_regions('rds')
    try:
        reports = await asyncio.gather(*[get_rds_instances(session=session, region=region, engine_types=engine_types) for region in regions])
    except Exception as e:
        print(f"An error occurred: {str(e)}")
    
    ... process report

if __name__ == "__main__":
    asyncio.run(main())

如果没有 asyncio,此代码将在大约 22 秒内完成。使用 asyncio,它在大约 20 秒内完成。

但是,事情变得有趣了,我使用了并发.futures:

def get_rds_instances(session, region, engine_types):
    report_rds = []
    mandatory_tags = {'Use-Case'}

    client = session.client('rds', region_name=region)
        try:
            response = client.describe_db_instances()
            rds_report.append(response)
        except (ClientError, Exception) as e:
            print(e)
    return reportd_rds


def main():
... some arguments definition
    session = get_rds_session(profile_name)
    regions = session.get_available_regions('rds')
    try:
        args = ((session, region, engine_types) for region in regions)
        with concurrent.futures.ThreadPoolExecutor() as executor:
        reports = executor.map(lambda p: get_rds_instances(*p), args)
    except Exception as e:
        print(f"An error occurred: {str(e)}")

    ... process report

if __name__ == "__main__":
    main()

这段代码在大约 3 秒内完成。

所以,我的问题是,这种差异正常吗?我是否遗漏了 asyncio 的某些内容或为 asyncio 做错了什么?

编辑:

描述数据库实例

谢谢!

python asynchronous python-asyncio concurrent.futures
1个回答
0
投票

与我的同事交谈后,describe_db_instances 是可阻止的函数,但为每个区域创建多个线程确实有帮助,就像 @user4815162342 提供的here。所以这里我的代码发生了变化:

def get_session(profile):
    session = boto3.Session(profile_name=profile)
    return session


def get_clients(session):
    regions = session.get_available_regions('rds')
    clients = [session.client('rds', region_name=region) for region in regions]
    return clients


async def get_instances(client, engine_types):
    output = []
    loop = asyncio.get_event_loop()

    try:
        response = await loop.run_in_executor(None, client.describe_db_instances)
        output.append(response['DBInstances'])
    except (ClientError, Exception) as e:
        print(e)
    return output


async def main():
... some arguments definition
    session = get_session(profile_name)
    clients = get_clients(session)

    reports = await asyncio.gather(*[get_instances(client, engine) for client in clients])

... process report

if __name__ == "__main__":
    asyncio.run(main())
© www.soinside.com 2019 - 2024. All rights reserved.