使用Psycopg2的失败事务会导致SQL约束停止按延迟行为吗?

问题描述 投票:2回答:1

我有一个Python脚本,该脚本使用Psycopg2库在Postgres数据库中插入数据。这些表是使用Django迁移创建的。尽管该数据库由Django使用,但它是数据分析系统的一部分,并且还将使用Psycopg2对其进行访问和操作。一切都在Ubuntu上运行。下面是数据库的简化版本。

“简化的数据库结构”“>

Django通过POST请求接收zip文件,并将相应的条目添加到Upload表中。每个条目都有zip文件位置。每个上载zip内都有会话zip,这些会话zip内又包含CSV文件以及相关数据。会话zip和CSV文件在数据库中没有引用,但是使用上述脚本插入了其信息。 在我完整的系统中,有更多的表,但是要解决这个问题,每个会话的数据表就足够了-每个会话的zip应该有2个CSV,一个用于数据,另一个用于会话元数据。

给出了脚本的循环下面。

“脚本周期”

因此,基本上,对于每个Upload zip,其会话都被提取并一个接一个地插入数据库。 Session的数据和相应的Data将插入单个事务中。由于数据具有引用会话的外键,因此必须推迟这些字段。 约束设置为可延迟的初始延迟

。将使用最大的现有会话ID值递增来计算要使用的会话ID主键。

有时

,接收到的数据已损坏或不完整,并且事务提交失败,应该这样做。问题是在这些失败中的一个失败之后,每次尝试新会话插入时,事务失败,错误消息指出会话和数据之间的外键约束违反了,如果没有延迟字段!

系统仍会在上载中接收并插入新条目,但是插入新会话的问题仍然存在。 如果我销毁数据库并重新创建它,那么一切正常,直到其中一个事务失败为止。

之后,由于外键冲突,再次不能再插入任何会话。

可能导致此行为的原因?显然,由于事务失败,这些字段的行为不再像定义的那样延迟。

我知道我的文字很长,但这是发现问题的最好方式。我预先感谢任何花时间阅读它并可能分享他们的专业知识的人。

软件版本为Postgres 10.12; Psycopg 2.8.5; Django 2.2.12; Python 3.6.9; Ubuntu 18.04。

更新-最小的可复制示例

下面列出了完全重现我的问题的步骤。对于你们中的某些人来说,许多事情是不必要的或太明显的,但是我选择包含所有内容,以使任何希望的人都能跟随。如果使用其他软件,则必须对此进行调整。我修改了示例系统,使其完全独立于Django。

A-输入您的Ubuntu系统

B-安装软件(其中某些操作可能是不必要的)

sudo apt update
sudo apt install python3-pip python3-dev libpq-dev postgresql postgresql-contrib

C-在Linux主目录中]创建一个alt_so_reprex目录并cd到它

mkdir alt_so_reprex
cd alt_so_reprex

D-创建虚拟环境

virtualenv venv
source venv/bin/activate
pip install psycopg2

E-创建下面列出的5个脚本-在每个脚本中,将{YOUR_USERNAME}替换为您的Linux用户名

。授予运行权限。
chmod +x 1_user_and_db.sh 2_db_tables.py 3_insert_uploads.py 4_create_test_files.sh 5_insert_sessions.py

脚本1:1_user_and_db.sh

#!/bin/bash

# Create user if it does not exist
if [ "$( sudo -u postgres -H -- psql -c "SELECT 1 FROM pg_roles WHERE rolname='{YOUR_USERNAME}'" )" != '1' ]
then
    sudo -u postgres -H -- psql -c "CREATE USER {YOUR_USERNAME} WITH PASSWORD 'password';";
fi

# Create the PgSQL database (ignore the error the first time this runs)
sudo -u postgres -H -- psql -c "DROP DATABASE test_db;";
sudo -u postgres -H -- psql -c "CREATE DATABASE test_db;";
sudo -u postgres -H -- psql -d test_db -c "ALTER ROLE {YOUR_USERNAME} SET client_encoding TO 'utf8';";
sudo -u postgres -H -- psql -d test_db -c "ALTER ROLE {YOUR_USERNAME} SET default_transaction_isolation TO 'read committed';";
sudo -u postgres -H -- psql -d test_db -c "ALTER ROLE {YOUR_USERNAME} SET timezone TO 'UTC';";
sudo -u postgres -H -- psql -d test_db -c "GRANT ALL PRIVILEGES ON DATABASE test_db TO {YOUR_USERNAME};";

# Show database
sudo -u postgres -H -- psql -d test_db -c "\l";

脚本2:2_db_tables.py(基于@snakecharmerb的贡献-谢谢)

#!/usr/bin/env python3

import psycopg2

# TABLE CREATION

reprex_upload = """CREATE TABLE Reprex_Upload (
    id BIGSERIAL PRIMARY KEY,
    zip_file VARCHAR(128),
    processed BOOLEAN DEFAULT FALSE
) """

reprex_session = """CREATE TABLE Reprex_Session (
    id BIGSERIAL PRIMARY KEY,
    metadata VARCHAR(128),
    upload_id BIGINT REFERENCES Reprex_Upload ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED 
) """

reprex_data = """CREATE TABLE Reprex_Data (
    id BIGSERIAL PRIMARY KEY,
    data VARCHAR(128),
    session_id BIGINT REFERENCES Reprex_Session ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED
)"""

print("Creating tables...")
with psycopg2.connect(dbname='test_db', user='{YOUR_USERNAME}', host='localhost', password='password') as conn:
    cur = conn.cursor()
    cur.execute(reprex_upload)
    cur.execute(reprex_session)
    cur.execute(reprex_data)
    conn.commit()

脚本3:3_insert_uploads.py

#!/usr/bin/env python3

import psycopg2
from psycopg2 import sql

DATABASE = 'test_db'
USER = '{YOUR_USERNAME}'
PASSWORD = 'password'

conn = None
cur = None

try:
    conn = psycopg2.connect(database=DATABASE, user=USER, password=PASSWORD)
    cur = conn.cursor()
    cur.execute(sql.SQL("INSERT INTO reprex_upload VALUES (DEFAULT, 'uploads/ok_upload.zip', DEFAULT)"))
    cur.execute(sql.SQL("INSERT INTO reprex_upload VALUES (DEFAULT, 'uploads/bad_upload.zip', DEFAULT)"))
    cur.execute(sql.SQL("INSERT INTO reprex_upload VALUES (DEFAULT, 'uploads/ok_upload.zip', DEFAULT)"))
    conn.commit()

except (Exception, psycopg2.Error) as err:
    print("Exception/Error:", err)

finally:
    # closing database conn.
    if cur:
        cur.close()
    if conn:
        conn.close()
    print("PostgreSQL conn is closed")

脚本4:4_create_test_files.sh

#!/bin/bash

mkdir uploads
cd uploads
rm *
{ echo "metadata"; echo "Session data..."; } > 123_Session.csv
{ echo "data"; echo "Data 1..."; } > 123_Data.csv
zip 123_session.zip 123_Data.csv 123_Session.csv
zip ok_upload.zip 123_session.zip
rm 123_session.zip
zip 123_session.zip 123_Session.csv
zip bad_upload.zip 123_session.zip
rm 123*

脚本5:5_insert_sessions.py

#!/usr/bin/env python3

import psycopg2
from psycopg2 import sql
import csv
from zipfile import ZipFile
import os
import shutil
import sys

MEDIA_ROOT_DIR = '/home/{YOUR_USERNAME}/alt_so_reprex/'
EXTRACTED_UPLOADS_DIR = '/home/{YOUR_USERNAME}/alt_so_reprex/extracted_uploads/'
EXTRACTED_SESSIONS_DIR = '/home/{YOUR_USERNAME}/alt_so_reprex/extracted_sessions/'
DATABASE = 'test_db'
USER = '{YOUR_USERNAME}'
PASSWORD = 'password'


def insert_csv(filepath, message, table, num_args, foreign_key):
    with open(filepath, 'r') as f:
        reader = csv.reader(f)
        next(reader)  # Skip the header row
        count = 0

        print(message)

        arguments_format = sql.SQL(', ').join(sql.Placeholder() * (num_args - 1))
        print('The arguments format is:', arguments_format.as_string(connection))

        for row in reader:
            row.append(foreign_key)
            cursor.execute(
                sql.SQL('INSERT INTO {} VALUES (DEFAULT, {})').format(sql.Identifier(table), arguments_format), row)
            count += 1

        print(count, 'record(s) will be inserted into %s table' % table)


def get_unprocessed_uploaded_zips():

    conn = None
    cur = None

    try:
        conn = psycopg2.connect(database=DATABASE, user=USER, password=PASSWORD)
        cur = conn.cursor()
        query = "SELECT * FROM reprex_upload WHERE processed=FALSE"
        cur.execute(query)
        res = cur.fetchall()

        # return true and res
        return True, res

    except (Exception, psycopg2.Error) as err:
        # return false and err message
        print("Exception/Error:", err)
        return False, None

    finally:
        # closing database conn.
        if cur:
            cur.close()
        if conn:
            conn.close()
        print("PostgreSQL conn is closed")


# COALESCE is used for the first insertion ever, where a NULL would be returned
def get_last_session_id():

    conn = None
    cur = None

    try:
        conn = psycopg2.connect(database=DATABASE, user=USER, password=PASSWORD)
        cur = conn.cursor()
        query = "SELECT COALESCE(MAX(id), 0) FROM reprex_session"
        cur.execute(query)
        result = cur.fetchone()

        # return true and results
        return True, result[0]

    except (Exception, psycopg2.Error) as err:
        # return false and err message
        print("Exception/Error:", err)
        return False, None

    finally:
        # closing database conn.
        if cur:
            cur.close()
        if conn:
            conn.close()
        print("PostgreSQL conn is closed")


# get all entries in Upload witch are unprocessed
query_success, results = get_unprocessed_uploaded_zips()

if query_success is False:
    sys.exit()

uploaded_zips = 0

for unprocessed_upload in results:

    uploaded_zips += 1
    print('\n\t' + '### UNPROCESSED UPLOAD ' + str(uploaded_zips) + ' ###\n')

    # The id field is the first one
    upload_zip_id = unprocessed_upload[0]

    # The zip_file field is the second one
    upload_zip_path = unprocessed_upload[1]
    print(upload_zip_path)

    # The filename will be the second part of the filepath
    upload_zip_name = upload_zip_path.split('/')[1]
    print(upload_zip_name)
    print(upload_zip_path)

    # The full filepath
    upload_zip_full_path = MEDIA_ROOT_DIR + upload_zip_path
    print(upload_zip_full_path)

    if upload_zip_full_path.endswith('.zip'):

        print('There is a new upload zip file: ' + upload_zip_full_path)

        # the folder name will be the file name minus the .zip extension
        upload_zip_folder_name = upload_zip_name.split('.')[0]
        upload_zip_folder_path = EXTRACTED_UPLOADS_DIR + upload_zip_folder_name

        # Create a ZipFile Object and load the received zip file in it
        with ZipFile(upload_zip_full_path, 'r') as zipObj:
            # Extract all the contents of zip file to the referred directory
            zipObj.extractall(upload_zip_folder_path)

        inserted_sessions = 0
        # Iterate over all session files inserting data in database
        for session_zip in os.scandir(upload_zip_folder_path):
            inserted_sessions += 1
            print('\n\t\t' + '### INSERTING SESSION ' + str(inserted_sessions) + ' ###\n')
            if session_zip.path.endswith('.zip') and session_zip.is_file():
                print('There is a new session zip file: ' + session_zip.name + '\n' + 'Located in: ' + session_zip.path)

                # the folder name will be the file name minus the .zip extension
                session_zip_folder_name = session_zip.name.split('.')[0]
                session_zip_folder_path = EXTRACTED_SESSIONS_DIR + session_zip_folder_name

                # Create a ZipFile Object and load the received zip file in it
                with ZipFile(session_zip, 'r') as zipObj:
                    # Extract all the contents of zip file to the referred directory
                    zipObj.extractall(session_zip_folder_path)

                session_file_path = session_zip_folder_path + '/' + \
                    session_zip_folder_name.replace('session', 'Session.csv')
                data_file_path = session_zip_folder_path + '/' + \
                    session_zip_folder_name.replace('session', 'Data.csv')

                # get the latest session id and increase it by one
                query_success, last_session_id = get_last_session_id()

                if query_success is False:
                    sys.exit()

                session_id = last_session_id + 1

                print('The session ID will be: ', session_id)

                connection = None
                cursor = None

                try:
                    # open a new database connection
                    connection = psycopg2.connect(database=DATABASE, user=USER, password=PASSWORD)
                    cursor = connection.cursor()

                    # First insert the Session file -> Link entry to Upload entry (upload_zip_id)
                    insert_csv(session_file_path, 'PROCESSING SESSION!\n', 'reprex_session', 3,
                               upload_zip_id)

                    # Then insert the Data file -> Link entry to Session entry (session_id)
                    insert_csv(data_file_path, 'PROCESSING DATA!\n', 'reprex_data', 3, session_id)

                    # modify the Upload entry to processed
                    update = "UPDATE reprex_upload SET processed=TRUE WHERE id=%s"
                    cursor.execute(update, (upload_zip_id,))

                    # make all changes or none
                    connection.commit()
                except (Exception, psycopg2.Error) as error:
                    # print error message
                    if connection:
                        print('ERROR:', error)
                finally:
                    # closing database connection.
                    if cursor:
                        cursor.close()
                    if connection:
                        connection.close()
                    print("PostgreSQL connection is closed")

                # Remove folder with extracted content - this erases the csv files
                try:
                    shutil.rmtree(session_zip_folder_path)
                except OSError as e:
                    print("Error: %s" % e.strerror)

        # Remove folder with extracted content - this erases the session zips
        try:
            shutil.rmtree(upload_zip_folder_path)
        except OSError as e:
            print("Error: %s " % e.strerror)

F-按顺序运行5个脚本。您将验证“错误上传”导致由于外键冲突而导致第二个“良好上传”未插入

。可以使用脚本3插入更多上载,但不能插入更多会话。如果您手动删除“不良上传”,则将验证由于外键冲突,您仍然无法插入更多会话。但是,如果您从脚本1开始重新创建数据库,则可以再次插入“良好会话”。 如果您从上传目录中删除“不良上传”,则可以根据需要插入任意多个会话。但是在出现单个错误之后,总会出现外键冲突,好像约束没有被延迟。

我更改了该问题的原始标题,因为我现在发现该问题绝不是由Django引起的。我还将数据库模型更改为比我最初提出的模型更简单的模型,并更改了原始文本以反映这一点。我还删除了Django标签。

通过检查ZIP内是否存在正确的CSV可以避免本示例中的特定错误

,但是在我的实际系统中,可能会发生其他错误。 我需要的是解决约束中行为的明显变化的方法。

我知道我非常冗长,感谢您的耐心配合和帮助。

我有一个Python脚本,该脚本使用Psycopg2库在Postgres数据库中插入数据。这些表是使用Django迁移创建的。尽管该数据库由Django使用,但它是......>

[在同事的帮助下,我找到了解决问题的方法。

具有“ bad_upload”的失败事务在内部增加了序列的下一个值。因此,在一个失败的事务之后,用于下一个会话的ID将不是当前的最大ID + 1,而是当前的最大ID + 2。

为了避免此类问题,获取下一个会话ID的正确方法是使用以下命令执行会话插入:

cursor.execute(sql.SQL('INSERT INTO {} VALUES (DEFAULT, {}) RETURNING id').format(sql.Identifier(table), arguments_format), row)

然后获取要用于的ID:

session_id = cursor.fetchone()

然后在插入数据表时使用该会话ID。

根据Postgres documentation:“为避免阻止从同一序列中获取数字的并发事务,将永远不会回退nextval操作;也就是说,一旦获取了值,就将其视为已使用且不会再次返回即使周围的事务稍后中止,或者调用查询最终不使用该值也是如此。”]

因此,这最终成为Postgres的问题,甚至不是Psycopg的问题。非常感谢为这个问题提供帮助的所有人。

python postgresql ubuntu psycopg2
1个回答
0
投票

[在同事的帮助下,我找到了解决问题的方法。

© www.soinside.com 2019 - 2024. All rights reserved.