使用 asyncio 与 pandas.to_sql() 内存不足

问题描述 投票:0回答:1

我正在尝试同时从 API 获取数据并将数据保存到 SQL Server 中的表中。但我遇到了内存问题。我相信原因是因为我在

await
保存任务之前先获取要完成的任务。下面是我的代码:
await

我该如何解决这个问题?有对 asyncio 更有经验的人可以建议吗?
另外,请注意,我使用 
import asyncio import polars as pl import pandas as pd import itertools async def save_to_land(transactions, table_name, conn_object, semaphore): def process_and_save(transactions): df = pl.DataFrame(transactions, infer_schema_length=2000) # some processing with polars # ... df.to_pandas().to_sql(table_name, conn_object, if_exists='append', index=False) if transactions: async with semaphore: # run sync code in separate thread to not block async event loop await asyncio.to_thread(process_and_save, transactions) async def get_transactions(api, practice_id, year, incremental, with_deleted, date_modified, semaphore): async with semaphore: transactions = await api.get_transactions(practice_id, year, incremental=incremental, with_deleted=with_deleted, date_modified=date_modified) await asyncio.sleep(0.5) return transactions async def get_transactions_and_save_to_land(api, practice_ids, table_name, conn_object, start_year, end_year, incremental, with_deleted, date_modified, batch_size=100000): # for concurrent api requests sem1 = asyncio.Semaphore(4) # for concurrent db writes sem2 = asyncio.Semaphore(3) if not incremental: get_tasks = [] for practice_id, year in itertools.product(practice_ids, range(start_year, end_year + 1)): task = asyncio.create_task(get_transactions(api, practice_id, year, incremental=incremental, with_deleted=with_deleted, date_modified=date_modified, semaphore=sem1)) get_tasks.append(task) results = await asyncio.gather(*get_tasks) save_tasks = [] batch_transactions = [] for transactions in results: batch_transactions.extend(transactions) if len(batch_transactions) >= batch_size: task = asyncio.create_task(save_to_land(batch_transactions, table_name, conn_object, semaphore=sem2)) save_tasks.append(task) batch_transactions = [] # for any remaining transactions if batch_transactions: save_tasks.append(asyncio.create_task(save_to_land(batch_transactions, table_name, conn_object, semaphore=sem2))) await asyncio.gather(*save_tasks)

来运行

asyncio.to_thread()
部分,因为它会阻止代码。
    

python pandas async-await python-asyncio
1个回答
0
投票

您可能需要考虑 asyncio.as_completed。你的代码的结尾就是:

pandas.to_sql()

© www.soinside.com 2019 - 2024. All rights reserved.