所以我有一个 python 脚本,它将从 csv 文件中读取一百万行左右。对于每一行的数据,我将对 postgres 中的表进行最多两个且至少一个 sql 查询。我想引入一些并行性来加速这个过程。我还不是这方面的专家,所以我想知道何时在 python 中使用多线程、多处理或异步编程。下面是当前的同步代码:
def distribute_rows_to_files(file_path: str) -> None:
exists_file = "exists_data.csv"
c_not_exists_file = "c_not_exists_data.csv"
i_not_exists_file = "i_not_exists_data.csv"
exception_file = "exceptions.csv"
# Open the files before the loop to reduce overhead
exists_file_handle = open(exists_file, mode='a', newline='')
c_not_exists_file_handle = open(c_not_exists_file, mode='a', newline='')
i_not_exists_file_handle = open(i_not_exists_file, mode='a', newline='')
exception_handle = open(exception_file, mode='a', newline='')
with psycopg2.connect(**AUTHENTICATOR_PG_DB_CREDENTIALS) as conn:
with open(file_path, mode='r', newline='') as file:
reader = csv.reader(file)
next(reader) # Skip the header line
count = 0
for row in reader:
count += 1
if count == 100:
break
# Process each row here
i_code, t_id, __, ___, ____ = row
try:
cur = conn.cursor()
query = """
SELECT customer_id
FROM buckets
WHERE i_code = %(i_code)s
LIMIT 1
"""
cur.execute(query, {"i_code": i_code})
result = cur.fetchone()
cur.close()
if result:
try:
cur = conn.cursor()
second_query = """
SELECT EXISTS (
SELECT 1
FROM customers
WHERE customer_id = %(customer_id)s
AND t_id = %(t_id)s
)
"""
cur.execute(second_query, {"customer_id": result[0], "toe_id": toe_id})
exists = cur.fetchone()[0]
cur.close()
file_handle = exists_file_handle if exists else c_not_exists_file_handle
writer = csv.writer(file_handle)
writer.writerow(row)
except Exception as e:
row_with_exception = row + (str(e),)
writer = csv.writer(exception_handle)
writer.writerow(row_with_exception)
else:
writer = csv.writer(i_not_exists_file_handle)
writer.writerow(row)
except Exception as e:
row_with_exception = row + (str(e),)
writer = csv.writer(exception_handle)
writer.writerow(row_with_exception)
exists_file_handle.close()
c_not_exists_file_handle.close()
i_not_exists_file_handle.close()
exception_handle.close()
如果我能找到采取任何方法的理由以及代码的样子,那就太好了!我读过一些文章,建议异步工作时使用
asyncpg
而不是 psycopg2
。但这是假设采用异步编程路线的方法
所以我做了一些研究,您通常使用异步进行 I/O 和阻塞操作,使用多线程进行更多 CPU 密集型操作? 这是使用 asyncpg 实现上述内容的方法:
import csv
import asyncio
import asyncpg
async def process_row(row, pool, exists_file_handle, c_not_exists_file_handle, i_not_exists_file_handle, exception_handle):
i_code, _, __, ___, ____ = row
try:
async with pool.acquire() as conn:
result = await conn.fetchval("""
SELECT customer_id
FROM buckets
WHERE i_code = $1
LIMIT 1
""", int(i_code))
if result:
exists = await conn.fetchval("""
SELECT EXISTS (
SELECT 1
FROM customers
WHERE customer_id = $1
)
""", result)
file_handle = exists_file_handle if exists else c_not_exists_file_handle
writer = csv.writer(file_handle)
writer.writerow(row)
else:
writer = csv.writer(i_not_exists_file_handle)
writer.writerow(row)
except Exception as e:
row.append(f"exception -- {e}")
writer = csv.writer(exception_handle)
writer.writerow(row)
print(f"exception -- {e}")
async def distribute_rows_to_files(file_path: str) -> None:
exists_file = "exists_data.csv"
c_not_exists_file = "c_not_exists_data.csv"
ica_not_exists_file = "i_not_exists_data.csv"
exception_file = "exceptions.csv"
pool = await asyncpg.create_pool(**AUTHENTICATOR_PG_DB_CREDENTIALS)
exists_file_handle = open(exists_file, mode='a', newline='')
c_not_exists_file_handle = open(c_not_exists_file, mode='a', newline='')
i_not_exists_file_handle = open(i_not_exists_file, mode='a', newline='')
exception_handle = open(exception_file, mode='a', newline='')
with open(file_path, mode='r', newline='') as file:
reader = csv.reader(file)
next(reader) # Skip the header line
tasks = []
count = 0
for row in reader:
count += 1
if count == 100:
break
task = process_row(row, pool, exists_file_handle, c_not_exists_file_handle, i_not_exists_file_handle, exception_handle)
tasks.append(task)
await asyncio.gather(*tasks)
for file_handle in (exists_file_handle, c_not_exists_file_handle, ica_not_exists_file_handle, exception_handle):
if file_handle:
file_handle.close()
await pool.close()
csv_file = "CS_MemberMapping_all_added.csv"
asyncio.run(distribute_rows_to_files(csv_file))