异步编程与多线程对 postgres 进行多次读取

问题描述 投票:0回答:1

所以我有一个 python 脚本,它将从 csv 文件中读取一百万行左右。对于每一行的数据,我将对 postgres 中的表进行最多两个且至少一个 sql 查询。我想引入一些并行性来加速这个过程。我还不是这方面的专家,所以我想知道何时在 python 中使用多线程、多处理或异步编程。下面是当前的同步代码:

def distribute_rows_to_files(file_path: str) -> None:
    exists_file = "exists_data.csv"
    c_not_exists_file = "c_not_exists_data.csv"
    i_not_exists_file = "i_not_exists_data.csv"
    exception_file = "exceptions.csv"


    # Open the files before the loop to reduce overhead
    exists_file_handle = open(exists_file, mode='a', newline='')
    c_not_exists_file_handle = open(c_not_exists_file, mode='a', newline='')
    i_not_exists_file_handle = open(i_not_exists_file, mode='a', newline='')
    exception_handle = open(exception_file, mode='a', newline='')


    with psycopg2.connect(**AUTHENTICATOR_PG_DB_CREDENTIALS) as conn:
        with open(file_path, mode='r', newline='') as file:
            reader = csv.reader(file)
            next(reader)  # Skip the header line


            count = 0
            for row in reader:
                count += 1
                if count == 100:
                    break


                # Process each row here
                i_code, t_id, __, ___, ____ = row
                try:
                    cur = conn.cursor()
                    query = """
                        SELECT customer_id
                        FROM buckets
                        WHERE i_code = %(i_code)s
                        LIMIT 1
                    """
                    cur.execute(query, {"i_code": i_code})
                    result = cur.fetchone()
                    cur.close()


                    if result:
                        try:
                            cur = conn.cursor()
                            second_query = """
                                SELECT EXISTS (
                                    SELECT 1
                                    FROM customers
                                    WHERE customer_id = %(customer_id)s
                                    AND t_id = %(t_id)s
                                )
                            """
                            cur.execute(second_query, {"customer_id": result[0], "toe_id": toe_id})
                            exists = cur.fetchone()[0]
                            cur.close()


                            file_handle = exists_file_handle if exists else c_not_exists_file_handle
                            writer = csv.writer(file_handle)
                            writer.writerow(row)
                        except Exception as e:
                            row_with_exception = row + (str(e),)
                            writer = csv.writer(exception_handle)
                            writer.writerow(row_with_exception)
                    else:
                        writer = csv.writer(i_not_exists_file_handle)
                        writer.writerow(row)


                except Exception as e:
                    row_with_exception = row + (str(e),)
                    writer = csv.writer(exception_handle)
                    writer.writerow(row_with_exception)


    exists_file_handle.close()
    c_not_exists_file_handle.close()
    i_not_exists_file_handle.close()
    exception_handle.close()

如果我能找到采取任何方法的理由以及代码的样子,那就太好了!我读过一些文章,建议异步工作时使用

asyncpg
而不是
psycopg2
。但这是假设采用异步编程路线的方法

python postgresql parallel-processing python-asyncio
1个回答
0
投票

所以我做了一些研究,您通常使用异步进行 I/O 和阻塞操作,使用多线程进行更多 CPU 密集型操作? 这是使用 asyncpg 实现上述内容的方法:

import csv
import asyncio
import asyncpg



async def process_row(row, pool, exists_file_handle, c_not_exists_file_handle, i_not_exists_file_handle, exception_handle):
    i_code, _, __, ___, ____ = row
    try:
        async with pool.acquire() as conn:
            result = await conn.fetchval("""
                SELECT customer_id
                FROM buckets
                WHERE i_code = $1
                LIMIT 1
            """, int(i_code))

            if result:
                exists = await conn.fetchval("""
                    SELECT EXISTS (
                        SELECT 1
                        FROM customers
                        WHERE customer_id = $1
                    )
                """, result)

                file_handle = exists_file_handle if exists else c_not_exists_file_handle
                writer = csv.writer(file_handle)
                writer.writerow(row)
            else:
                writer = csv.writer(i_not_exists_file_handle)
                writer.writerow(row)
    except Exception as e:
        row.append(f"exception -- {e}")
        writer = csv.writer(exception_handle)
        writer.writerow(row)
        print(f"exception -- {e}")

async def distribute_rows_to_files(file_path: str) -> None:
    exists_file = "exists_data.csv"
    c_not_exists_file = "c_not_exists_data.csv"
    ica_not_exists_file = "i_not_exists_data.csv"
    exception_file = "exceptions.csv"

    pool = await asyncpg.create_pool(**AUTHENTICATOR_PG_DB_CREDENTIALS)

    exists_file_handle = open(exists_file, mode='a', newline='')
    c_not_exists_file_handle = open(c_not_exists_file, mode='a', newline='')
    i_not_exists_file_handle = open(i_not_exists_file, mode='a', newline='')
    exception_handle = open(exception_file, mode='a', newline='')

    with open(file_path, mode='r', newline='') as file:
        reader = csv.reader(file)
        next(reader)  # Skip the header line

        tasks = []
        count = 0
        for row in reader:
            count += 1
            if count == 100:
                break
            task = process_row(row, pool, exists_file_handle, c_not_exists_file_handle, i_not_exists_file_handle, exception_handle)
            tasks.append(task)

        await asyncio.gather(*tasks)

    for file_handle in (exists_file_handle, c_not_exists_file_handle, ica_not_exists_file_handle, exception_handle):
        if file_handle:
            file_handle.close()

    await pool.close()

csv_file = "CS_MemberMapping_all_added.csv"
asyncio.run(distribute_rows_to_files(csv_file))
© www.soinside.com 2019 - 2024. All rights reserved.