psycopg2.errors.DeadlockDetected:检测到死锁

问题描述 投票:0回答:0

问题

我有一个批处理作业,当用户更新 UI 的一行时运行。允许用户同时更新多行,这将触发多个批处理作业,每个批处理作业都有一个唯一的

run_id
.

此作业创建一个 CSV 文件并将值插入表中 (

allocations_update
)

在将值转储到此表后,我们使用前一个表 (

allocations_od
) 中的值更新第二个表 (
allocations_update
)。

更新

allocations_od
的查询是:

UPDATE db.allocations_od target
SET rec_alloc = src.rec_alloc 
FROM db.allocations_update src
WHERE  src.run_id = '{run_id}' 
  AND  src.col1 = target.col1
  AND  src.col2 = target.col2

但是,有时当用户触发此作业的多个实例(通过同时更新多个列)时,当它尝试运行

allocations_od
的第二次更新查询时,我会遇到死锁错误。

完整的ERROR Message如下图:

psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL:  Process 15455 waits for ShareLock on transaction 62597603; blocked by process 15538.
Process 15538 waits for ShareLock on transaction 62597592; blocked by process 15455.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (479821,43) in relation \""allocations_od_20230514\""

我想知道是什么导致了死锁。我最好的猜测是 Job 的其他一些实例仍在运行第一个查询,该查询在

allocations_update
上获得锁定,因此这两个进程都被阻止了。

我的代码

整个批处理过程相当漫长和复杂,但这是导致问题的最后一部分

    
    def update_alloc_query(self, final_data, stage_location):
        """ Method to bulk update allocations od table"""

        # stage_location is the s3 path of csv file.

        last_created_date = self.get_last_created_date()
        last_created_date = last_created_date.strftime('%Y-%m-%d')
        final_data['created_date'] = last_created_date
        run_id = final_data['run_id'].unique()[0]
        s3.s3_upload_df(stage_location, final_data)
        UITableLoader.bulk_upload_from_csv(db_model=AllocationsUpdate,
                                           file_location=stage_location,
                                           data_types={"rsid": "str", "passenger_class": "str",
                                                       "journey_origin": "str",
                                                       "journey_destination": "str",
                                                       "bucket_code": "str",
                                                       "eff_departure_date": "str",
                                                       "recommended_allocation": "float",
                                                       "run_id": "str"},
                                           sep="|",
                                           created_date=last_created_date)
        self.logger.info("Added table into new data")
        allo_sql = f"UPDATE db.allocations_od target\
                    set rec_alloc = src.rec_alloc FROM\
                    db.allocations_update src\
                    WHERE src.run_id = '{run_id}' AND  \
                    src.col1 = target.col1 AND\
                    src.col2 = target.col2'"
        execute_sql_statement(allo_sql)
        self.logger.info("executed update query")



# UITableLoader.bulk_upload_from_csv


 @staticmethod
 def bulk_upload_from_csv(db_model, file_location, data_types=None, sep=',',
                created_date=None, chunk_size=1000):
        """Function uploads data from local csv file to sql alchemy db."""
        LOGGER.info("Bulk loading data.",
                    file_location=file_location, table=db_model.__table__)
        record_count = 0
        chunks = pd.read_csv(
            file_location,
            dtype=data_types,
            chunksize=chunk_size,
            sep=sep,
            on_bad_lines='skip'
        )

        for chunk in chunks:
            chunk = chunk.where((pd.notnull(chunk)), None)
            chunk = chunk.replace({np.nan: None})
            record_count += chunk.shape[0]
            if created_date is not None:
                chunk['created_date'] = created_date
            rows = chunk.to_dict(orient='records')
            sqa_save(db_model, rows, save_many=True)

        return record_count


def execute_sql_statement(sql_statement, conn_string=None):  # pragma: no cover
    """Executes the given sql_statement"""
    if not sql_statement:
        return
    if not conn_string:
        conn_string = get_db_connection_string()
    dbsession = get_db_session(conn_string)
    try:
        dbsession.execute(sql_statement)
        dbsession.commit()
    except SQLAlchemyError as ex:
        LOGGER.exception(f"Error executing sql statement '{sql_statement}'")
        dbsession.rollback()
        raise ex
    finally:
        dbsession.close()

python postgresql psycopg2 deadlock database-deadlocks
© www.soinside.com 2019 - 2024. All rights reserved.