[在同事的帮助下,我找到了解决问题的方法。
我有一个Python脚本,该脚本使用Psycopg2库在Postgres数据库中插入数据。这些表是使用Django迁移创建的。尽管该数据库由Django使用,但它是数据分析系统的一部分,并且还将使用Psycopg2对其进行访问和操作。一切都在Ubuntu上运行。下面是数据库的简化版本。
“>
Django通过POST请求接收zip文件,并将相应的条目添加到Upload表中。每个条目都有zip文件位置。每个上载zip内都有会话zip,这些会话zip内又包含CSV文件以及相关数据。会话zip和CSV文件在数据库中没有引用,但是使用上述脚本插入了其信息。 在我完整的系统中,有更多的表,但是要解决这个问题,每个会话的数据表就足够了-每个会话的zip应该有2个CSV,一个用于数据,另一个用于会话元数据。
给出了脚本的循环下面。因此,基本上,对于每个Upload zip,其会话都被提取并一个接一个地插入数据库。 Session的数据和相应的Data将插入单个事务中。由于数据具有引用会话的外键,因此必须推迟这些字段。 约束设置为可延迟的初始延迟
。将使用最大的现有会话ID值递增来计算要使用的会话ID主键。有时
,接收到的数据已损坏或不完整,并且事务提交失败,应该这样做。问题是在这些失败中的一个失败之后,每次尝试新会话插入时,事务失败,错误消息指出会话和数据之间的外键约束违反了,如果没有延迟字段!系统仍会在上载中接收并插入新条目,但是插入新会话的问题仍然存在。 如果我销毁数据库并重新创建它,那么一切正常,直到其中一个事务失败为止。
之后,由于外键冲突,再次不能再插入任何会话。可能导致此行为的原因?显然,由于事务失败,这些字段的行为不再像定义的那样延迟。
我知道我的文字很长,但这是发现问题的最好方式。我预先感谢任何花时间阅读它并可能分享他们的专业知识的人。
软件版本为Postgres 10.12; Psycopg 2.8.5; Django 2.2.12; Python 3.6.9; Ubuntu 18.04。
下面列出了完全重现我的问题的步骤。对于你们中的某些人来说,许多事情是不必要的或太明显的,但是我选择包含所有内容,以使任何希望的人都能跟随。如果使用其他软件,则必须对此进行调整。我修改了示例系统,使其完全独立于Django。
A-输入您的Ubuntu系统
B-安装软件(其中某些操作可能是不必要的)
sudo apt update sudo apt install python3-pip python3-dev libpq-dev postgresql postgresql-contrib
C-在Linux主目录中]创建一个alt_so_reprex目录并cd到它
mkdir alt_so_reprex cd alt_so_reprex
D-创建虚拟环境
virtualenv venv source venv/bin/activate pip install psycopg2
E-创建下面列出的5个脚本-在每个脚本中,将{YOUR_USERNAME}替换为您的Linux用户名
。授予运行权限。
chmod +x 1_user_and_db.sh 2_db_tables.py 3_insert_uploads.py 4_create_test_files.sh 5_insert_sessions.py
脚本1:1_user_and_db.sh
#!/bin/bash # Create user if it does not exist if [ "$( sudo -u postgres -H -- psql -c "SELECT 1 FROM pg_roles WHERE rolname='{YOUR_USERNAME}'" )" != '1' ] then sudo -u postgres -H -- psql -c "CREATE USER {YOUR_USERNAME} WITH PASSWORD 'password';"; fi # Create the PgSQL database (ignore the error the first time this runs) sudo -u postgres -H -- psql -c "DROP DATABASE test_db;"; sudo -u postgres -H -- psql -c "CREATE DATABASE test_db;"; sudo -u postgres -H -- psql -d test_db -c "ALTER ROLE {YOUR_USERNAME} SET client_encoding TO 'utf8';"; sudo -u postgres -H -- psql -d test_db -c "ALTER ROLE {YOUR_USERNAME} SET default_transaction_isolation TO 'read committed';"; sudo -u postgres -H -- psql -d test_db -c "ALTER ROLE {YOUR_USERNAME} SET timezone TO 'UTC';"; sudo -u postgres -H -- psql -d test_db -c "GRANT ALL PRIVILEGES ON DATABASE test_db TO {YOUR_USERNAME};"; # Show database sudo -u postgres -H -- psql -d test_db -c "\l";
脚本2:2_db_tables.py(基于@snakecharmerb的贡献-谢谢)
#!/usr/bin/env python3 import psycopg2 # TABLE CREATION reprex_upload = """CREATE TABLE Reprex_Upload ( id BIGSERIAL PRIMARY KEY, zip_file VARCHAR(128), processed BOOLEAN DEFAULT FALSE ) """ reprex_session = """CREATE TABLE Reprex_Session ( id BIGSERIAL PRIMARY KEY, metadata VARCHAR(128), upload_id BIGINT REFERENCES Reprex_Upload ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED ) """ reprex_data = """CREATE TABLE Reprex_Data ( id BIGSERIAL PRIMARY KEY, data VARCHAR(128), session_id BIGINT REFERENCES Reprex_Session ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED )""" print("Creating tables...") with psycopg2.connect(dbname='test_db', user='{YOUR_USERNAME}', host='localhost', password='password') as conn: cur = conn.cursor() cur.execute(reprex_upload) cur.execute(reprex_session) cur.execute(reprex_data) conn.commit()
脚本3:3_insert_uploads.py
#!/usr/bin/env python3 import psycopg2 from psycopg2 import sql DATABASE = 'test_db' USER = '{YOUR_USERNAME}' PASSWORD = 'password' conn = None cur = None try: conn = psycopg2.connect(database=DATABASE, user=USER, password=PASSWORD) cur = conn.cursor() cur.execute(sql.SQL("INSERT INTO reprex_upload VALUES (DEFAULT, 'uploads/ok_upload.zip', DEFAULT)")) cur.execute(sql.SQL("INSERT INTO reprex_upload VALUES (DEFAULT, 'uploads/bad_upload.zip', DEFAULT)")) cur.execute(sql.SQL("INSERT INTO reprex_upload VALUES (DEFAULT, 'uploads/ok_upload.zip', DEFAULT)")) conn.commit() except (Exception, psycopg2.Error) as err: print("Exception/Error:", err) finally: # closing database conn. if cur: cur.close() if conn: conn.close() print("PostgreSQL conn is closed")
脚本4:4_create_test_files.sh
#!/bin/bash mkdir uploads cd uploads rm * { echo "metadata"; echo "Session data..."; } > 123_Session.csv { echo "data"; echo "Data 1..."; } > 123_Data.csv zip 123_session.zip 123_Data.csv 123_Session.csv zip ok_upload.zip 123_session.zip rm 123_session.zip zip 123_session.zip 123_Session.csv zip bad_upload.zip 123_session.zip rm 123*
脚本5:5_insert_sessions.py
#!/usr/bin/env python3 import psycopg2 from psycopg2 import sql import csv from zipfile import ZipFile import os import shutil import sys MEDIA_ROOT_DIR = '/home/{YOUR_USERNAME}/alt_so_reprex/' EXTRACTED_UPLOADS_DIR = '/home/{YOUR_USERNAME}/alt_so_reprex/extracted_uploads/' EXTRACTED_SESSIONS_DIR = '/home/{YOUR_USERNAME}/alt_so_reprex/extracted_sessions/' DATABASE = 'test_db' USER = '{YOUR_USERNAME}' PASSWORD = 'password' def insert_csv(filepath, message, table, num_args, foreign_key): with open(filepath, 'r') as f: reader = csv.reader(f) next(reader) # Skip the header row count = 0 print(message) arguments_format = sql.SQL(', ').join(sql.Placeholder() * (num_args - 1)) print('The arguments format is:', arguments_format.as_string(connection)) for row in reader: row.append(foreign_key) cursor.execute( sql.SQL('INSERT INTO {} VALUES (DEFAULT, {})').format(sql.Identifier(table), arguments_format), row) count += 1 print(count, 'record(s) will be inserted into %s table' % table) def get_unprocessed_uploaded_zips(): conn = None cur = None try: conn = psycopg2.connect(database=DATABASE, user=USER, password=PASSWORD) cur = conn.cursor() query = "SELECT * FROM reprex_upload WHERE processed=FALSE" cur.execute(query) res = cur.fetchall() # return true and res return True, res except (Exception, psycopg2.Error) as err: # return false and err message print("Exception/Error:", err) return False, None finally: # closing database conn. if cur: cur.close() if conn: conn.close() print("PostgreSQL conn is closed") # COALESCE is used for the first insertion ever, where a NULL would be returned def get_last_session_id(): conn = None cur = None try: conn = psycopg2.connect(database=DATABASE, user=USER, password=PASSWORD) cur = conn.cursor() query = "SELECT COALESCE(MAX(id), 0) FROM reprex_session" cur.execute(query) result = cur.fetchone() # return true and results return True, result[0] except (Exception, psycopg2.Error) as err: # return false and err message print("Exception/Error:", err) return False, None finally: # closing database conn. if cur: cur.close() if conn: conn.close() print("PostgreSQL conn is closed") # get all entries in Upload witch are unprocessed query_success, results = get_unprocessed_uploaded_zips() if query_success is False: sys.exit() uploaded_zips = 0 for unprocessed_upload in results: uploaded_zips += 1 print('\n\t' + '### UNPROCESSED UPLOAD ' + str(uploaded_zips) + ' ###\n') # The id field is the first one upload_zip_id = unprocessed_upload[0] # The zip_file field is the second one upload_zip_path = unprocessed_upload[1] print(upload_zip_path) # The filename will be the second part of the filepath upload_zip_name = upload_zip_path.split('/')[1] print(upload_zip_name) print(upload_zip_path) # The full filepath upload_zip_full_path = MEDIA_ROOT_DIR + upload_zip_path print(upload_zip_full_path) if upload_zip_full_path.endswith('.zip'): print('There is a new upload zip file: ' + upload_zip_full_path) # the folder name will be the file name minus the .zip extension upload_zip_folder_name = upload_zip_name.split('.')[0] upload_zip_folder_path = EXTRACTED_UPLOADS_DIR + upload_zip_folder_name # Create a ZipFile Object and load the received zip file in it with ZipFile(upload_zip_full_path, 'r') as zipObj: # Extract all the contents of zip file to the referred directory zipObj.extractall(upload_zip_folder_path) inserted_sessions = 0 # Iterate over all session files inserting data in database for session_zip in os.scandir(upload_zip_folder_path): inserted_sessions += 1 print('\n\t\t' + '### INSERTING SESSION ' + str(inserted_sessions) + ' ###\n') if session_zip.path.endswith('.zip') and session_zip.is_file(): print('There is a new session zip file: ' + session_zip.name + '\n' + 'Located in: ' + session_zip.path) # the folder name will be the file name minus the .zip extension session_zip_folder_name = session_zip.name.split('.')[0] session_zip_folder_path = EXTRACTED_SESSIONS_DIR + session_zip_folder_name # Create a ZipFile Object and load the received zip file in it with ZipFile(session_zip, 'r') as zipObj: # Extract all the contents of zip file to the referred directory zipObj.extractall(session_zip_folder_path) session_file_path = session_zip_folder_path + '/' + \ session_zip_folder_name.replace('session', 'Session.csv') data_file_path = session_zip_folder_path + '/' + \ session_zip_folder_name.replace('session', 'Data.csv') # get the latest session id and increase it by one query_success, last_session_id = get_last_session_id() if query_success is False: sys.exit() session_id = last_session_id + 1 print('The session ID will be: ', session_id) connection = None cursor = None try: # open a new database connection connection = psycopg2.connect(database=DATABASE, user=USER, password=PASSWORD) cursor = connection.cursor() # First insert the Session file -> Link entry to Upload entry (upload_zip_id) insert_csv(session_file_path, 'PROCESSING SESSION!\n', 'reprex_session', 3, upload_zip_id) # Then insert the Data file -> Link entry to Session entry (session_id) insert_csv(data_file_path, 'PROCESSING DATA!\n', 'reprex_data', 3, session_id) # modify the Upload entry to processed update = "UPDATE reprex_upload SET processed=TRUE WHERE id=%s" cursor.execute(update, (upload_zip_id,)) # make all changes or none connection.commit() except (Exception, psycopg2.Error) as error: # print error message if connection: print('ERROR:', error) finally: # closing database connection. if cursor: cursor.close() if connection: connection.close() print("PostgreSQL connection is closed") # Remove folder with extracted content - this erases the csv files try: shutil.rmtree(session_zip_folder_path) except OSError as e: print("Error: %s" % e.strerror) # Remove folder with extracted content - this erases the session zips try: shutil.rmtree(upload_zip_folder_path) except OSError as e: print("Error: %s " % e.strerror)
F-按顺序运行5个脚本。您将验证“错误上传”导致由于外键冲突而导致第二个“良好上传”未插入
。可以使用脚本3插入更多上载,但不能插入更多会话。如果您手动删除“不良上传”,则将验证由于外键冲突,您仍然无法插入更多会话。但是,如果您从脚本1开始重新创建数据库,则可以再次插入“良好会话”。 如果您从上传目录中删除“不良上传”,则可以根据需要插入任意多个会话。但是在出现单个错误之后,总会出现外键冲突,好像约束没有被延迟。
我更改了该问题的原始标题,因为我现在发现该问题绝不是由Django引起的。我还将数据库模型更改为比我最初提出的模型更简单的模型,并更改了原始文本以反映这一点。我还删除了Django标签。
通过检查ZIP内是否存在正确的CSV可以避免本示例中的特定错误
,但是在我的实际系统中,可能会发生其他错误。 我需要的是解决约束中行为的明显变化的方法。我知道我非常冗长,感谢您的耐心配合和帮助。
我有一个Python脚本,该脚本使用Psycopg2库在Postgres数据库中插入数据。这些表是使用Django迁移创建的。尽管该数据库由Django使用,但它是......>
[在同事的帮助下,我找到了解决问题的方法。
具有“ bad_upload”的失败事务在内部增加了序列的下一个值。因此,在一个失败的事务之后,用于下一个会话的ID将不是当前的最大ID + 1,而是当前的最大ID + 2。
为了避免此类问题,获取下一个会话ID的正确方法是使用以下命令执行会话插入:
cursor.execute(sql.SQL('INSERT INTO {} VALUES (DEFAULT, {}) RETURNING id').format(sql.Identifier(table), arguments_format), row)
然后获取要用于的ID:
session_id = cursor.fetchone()
然后在插入数据表时使用该会话ID。
根据Postgres documentation:“为避免阻止从同一序列中获取数字的并发事务,将永远不会回退nextval操作;也就是说,一旦获取了值,就将其视为已使用且不会再次返回即使周围的事务稍后中止,或者调用查询最终不使用该值也是如此。”]
因此,这最终成为Postgres的问题,甚至不是Psycopg的问题。非常感谢为这个问题提供帮助的所有人。
[在同事的帮助下,我找到了解决问题的方法。