我尝试通过异步方式优化 Python 代码。为此,我尝试了 asyncio 和并发.futures 库。
这是我的代码:
async def get_rds_instances(session, region, engine_types):
report_rds = []
mandatory_tags = {'Use-Case'}
client = session.client('rds', region_name=region)
try:
await asyncio.sleep(1)
response = client.describe_db_instances()
rds_report.append(response)
except (ClientError, Exception) as e:
print(e)
return reportd_rds
async def main():
... some arguments definition
session = get_rds_session(profile_name)
regions = session.get_available_regions('rds')
try:
reports = await asyncio.gather(*[get_rds_instances(session=session, region=region, engine_types=engine_types) for region in regions])
except Exception as e:
print(f"An error occurred: {str(e)}")
... process report
if __name__ == "__main__":
asyncio.run(main())
如果没有 asyncio,此代码将在大约 22 秒内完成。使用 asyncio,它在大约 20 秒内完成。
但是,事情变得有趣了,我使用了并发.futures:
def get_rds_instances(session, region, engine_types):
report_rds = []
mandatory_tags = {'Use-Case'}
client = session.client('rds', region_name=region)
try:
response = client.describe_db_instances()
rds_report.append(response)
except (ClientError, Exception) as e:
print(e)
return reportd_rds
def main():
... some arguments definition
session = get_rds_session(profile_name)
regions = session.get_available_regions('rds')
try:
args = ((session, region, engine_types) for region in regions)
with concurrent.futures.ThreadPoolExecutor() as executor:
reports = executor.map(lambda p: get_rds_instances(*p), args)
except Exception as e:
print(f"An error occurred: {str(e)}")
... process report
if __name__ == "__main__":
main()
这段代码在大约 3 秒内完成。
所以,我的问题是,这种差异正常吗?我是否遗漏了 asyncio 的某些内容或为 asyncio 做错了什么?
编辑:
谢谢!
与我的同事交谈后,describe_db_instances 是可阻止的函数,但为每个区域创建多个线程确实有帮助,就像 @user4815162342 提供的here。所以这里我的代码发生了变化:
def get_session(profile):
session = boto3.Session(profile_name=profile)
return session
def get_clients(session):
regions = session.get_available_regions('rds')
clients = [session.client('rds', region_name=region) for region in regions]
return clients
async def get_instances(client, engine_types):
output = []
loop = asyncio.get_event_loop()
try:
response = await loop.run_in_executor(None, client.describe_db_instances)
output.append(response['DBInstances'])
except (ClientError, Exception) as e:
print(e)
return output
async def main():
... some arguments definition
session = get_session(profile_name)
clients = get_clients(session)
reports = await asyncio.gather(*[get_instances(client, engine) for client in clients])
... process report
if __name__ == "__main__":
asyncio.run(main())