读写多处理 postgresql 和 python

问题描述 投票:0回答:1

我正在尝试使用 python 的

postgresql
模块并行读取和写入
multiprocessing
数据库。我有两个进程,
read_database
write_database
。我正在使用
write_database
函数(在循环中)编写条目,如果我看到有特定数字的条目(在本例中为
60
),则并行读取该特定行并在内部完成一些处理写入时的
read_database
功能是并行进行的。这是我的示例代码:

import psycopg2
import multiprocessing as mp

def initialize_database(database_name):
    conn = psycopg2.connect(database=database_name,
                            user='user',
                            host='host_name',
                            password='password')
    return conn

def write_database(list_id, database_name, table_name, process):
    conn = initialize_database(database_name)
    cur = conn.cursor()
    for i in list_id:
        cur.execute("INSERT INTO "+str(table_name)+"(emp_id) VALUES(%s)",
                    (i,))
        conn.commit()
        print(str(process)+ ' writes into the database')

def read_database(num_mult, database_name, table_name, process):
    conn = initialize_database(database_name)
    cur = conn.cursor()
    cur.execute("select * from "+str(table_name)+" where emp_id = "
                +str(num_mult))
    data = cur.fetchall()
    print(data)
    cur.execute("update " + str(table_name) +
                " set emp_id = 1000 where emp_id = %s", (data[0][1],))
    conn.commit()
    print(str(process) + ' reads from the database')
if __name__ == '__main__':
    database_name = 'db_name'
    table_name = 'test_table'
    conn = initialize_database(database_name)
    try:
        cur = conn.cursor()
        cur.execute("DROP TABLE " + str(table_name))
        conn.commit()
        cur.close()
    except psycopg2.errors.UndefinedTable:
        print('A new table name ' + str(table_name) + ' is created')
    cur = conn.cursor()
    cur.execute(""" CREATE TABLE IF NOT EXISTS """ + str(table_name) + """(
                ID BIGSERIAL PRIMARY KEY,
                emp_id BIGINT);
                """)
    conn.commit()
    num_range = [i for i in range(0, 100)]
    p1 = mp.Process(target=write_database, args=(num_range, database_name, table_name, 'Process 1'))
    p2 = mp.Process(target=read_database, args=(60, database_name, table_name, 'Process 2'))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

我有以下问题:

data
中的列表
read_database
超出范围。这是因为
data
empty,我使用
print
函数验证了这一点。如果并行阅读有效,则
emp_id
(即
60
)应更新为
1000
。我是不是错过了什么?

python postgresql multiprocessing
1个回答
0
投票

正如我提到的,我认为您不需要多重处理(除非您想并行执行插入并且不关心插入顺序)

以下是如何使用

threading

做到这一点
import psycopg2
import threading
import time

def get_connection():
    conn = psycopg2.connect(
        dbname="your_database_name",
        user="your_username",
        password="your_password",
        host="your_host",
        port="your_port",
    )
    return conn

def create_table():
    conn = get_connection()
    cur = conn.cursor()
    cur.execute(
        """
        CREATE TABLE IF NOT EXISTS test_table (
            id SERIAL PRIMARY KEY,
            value INTEGER
        )
    """
    )
    conn.commit()
    conn.close()


def insert_numbers():
    conn =get_connection()
    cur = conn.cursor()
    for i in range(1, 101):
        cur.execute("INSERT INTO test_table (value) VALUES (%s)", (i,))
        conn.commit()
        time.sleep(0.1)
    conn.close()


def check_table():
    conn =get_connection()
    cur = conn.cursor()
    while True:
        cur.execute("SELECT * FROM test_table WHERE value = %s", (50,))
        row = cur.fetchone()
        if row:
            print("Value 50 found!")
            break
        time.sleep(1)
    conn.close()


def main():
    create_table()

    check_thread = threading.Thread(target=check_table)
    insert_thread = threading.Thread(target=insert_numbers)

    check_thread.start()
    insert_thread.start()

    check_thread.join()
    insert_thread.join()

if __name__ == "__main__":
    main()

这将开始每 0.1 秒插入一次数字,并每 1 秒检查一次值 50。

© www.soinside.com 2019 - 2024. All rights reserved.