我正在尝试同时从 API 获取数据并将数据保存到 SQL Server 中的表中。但我遇到了内存问题。我相信原因是因为我在
await
保存任务之前先获取要完成的任务。下面是我的代码:await
我该如何解决这个问题?有对 asyncio 更有经验的人可以建议吗? 另外,请注意,我使用
import asyncio
import polars as pl
import pandas as pd
import itertools
async def save_to_land(transactions, table_name, conn_object, semaphore):
def process_and_save(transactions):
df = pl.DataFrame(transactions, infer_schema_length=2000)
# some processing with polars
# ...
df.to_pandas().to_sql(table_name, conn_object, if_exists='append', index=False)
if transactions:
async with semaphore:
# run sync code in separate thread to not block async event loop
await asyncio.to_thread(process_and_save, transactions)
async def get_transactions(api, practice_id, year, incremental, with_deleted, date_modified, semaphore):
async with semaphore:
transactions = await api.get_transactions(practice_id, year, incremental=incremental, with_deleted=with_deleted, date_modified=date_modified)
await asyncio.sleep(0.5)
return transactions
async def get_transactions_and_save_to_land(api, practice_ids, table_name, conn_object, start_year, end_year, incremental, with_deleted, date_modified, batch_size=100000):
# for concurrent api requests
sem1 = asyncio.Semaphore(4)
# for concurrent db writes
sem2 = asyncio.Semaphore(3)
if not incremental:
get_tasks = []
for practice_id, year in itertools.product(practice_ids, range(start_year, end_year + 1)):
task = asyncio.create_task(get_transactions(api, practice_id, year, incremental=incremental, with_deleted=with_deleted, date_modified=date_modified, semaphore=sem1))
get_tasks.append(task)
results = await asyncio.gather(*get_tasks)
save_tasks = []
batch_transactions = []
for transactions in results:
batch_transactions.extend(transactions)
if len(batch_transactions) >= batch_size:
task = asyncio.create_task(save_to_land(batch_transactions, table_name, conn_object, semaphore=sem2))
save_tasks.append(task)
batch_transactions = []
# for any remaining transactions
if batch_transactions:
save_tasks.append(asyncio.create_task(save_to_land(batch_transactions, table_name, conn_object, semaphore=sem2)))
await asyncio.gather(*save_tasks)
来运行
asyncio.to_thread()
部分,因为它会阻止代码。您可能需要考虑 asyncio.as_completed。你的代码的结尾就是:
pandas.to_sql()