交易回滚

问题描述 投票:0回答:1

我有一个大列表,它本身由53,000,000个较小的列表作为元素。我想将这些较小的列表中的每一个作为行连续提交给db,批处理大小为1,000,000,这意味着每次脚本连接到db时,它都会提交1000,000个元素,然后断开与db的连接,然后再次连接以提交另外1,000,000行。

现在,我的问题是,如果在中间发生错误,例如,在提交50,000,000行之后,例如,我需要删除数据库中的所有行,并尝试从头开始提交所有内容。

我以为也许我可以使用rollback()来删除目前已添加的所有50,000,000行,但是只要我使用循环,就不知道如何回退所有在其中提交的5000万行批次。

有人有建议吗?

这是我的脚本:“结果”是将53,000,000个较小的列表作为元素的列表。

batch = []
counter = 0
BATCH_SIZE =1000000
cursor_count = 0

def prepare_names(names):
    return [w.replace("'", '') for w in names]

for i in range(len(results)):
    if counter < BATCH_SIZE:
        batch.append(prepare_names([results[i][0], results[i][1], results[i][2]]))  # batch => [[ACC1234.0, 'Some full taxa name'], ...]
        counter += 1
    else:
        batch.append(prepare_names([results[i][0], results[i][1], results[i][2]]))

        values = (", ").join([f"('{d[0]}', '{d[1]}', '{d[2]}')" for d in batch])
        sql = f"INSERT INTO accession_taxonomy(accession_number, taxon_id, taxonomy) VALUES {values}"

        try:
            cursor.execute(sql)
            db.commit()
        except Exception as exception:
            print(exception)
            print(f"Problem with query: {sql}")

        print(cursor.rowcount, "Records Inserted")
        cursor_count += cursor.rowcount
        counter = 0
        batch = []
else:
    if batch:
        values = (", ").join([f"('{d[0]}', '{d[1]}', '{d[2]}')" for d in batch])
        sql = f"INSERT INTO accession_taxonomy(accession_number, taxon_id, taxonomy) VALUES {values}"

        try:
            cursor.execute(sql)
            db.commit()
        except Exception as exception:
            print(exception)
            print(f"Problem with query: {sql}")

        print(cursor.rowcount, "Records Inserted")
        cursor_count += cursor.rowcount

print("Total Number Of %s Rows Has Been Added." %(cursor_count))
db.close()
python mysql transactions rollback
1个回答
0
投票

commit之后没有回滚。

考虑此:

1st Attempt 1M rows : committed
2nd Attempt 1M rows : committed
3rd Attempt 1m rows : error

您只能回滚第三次尝试。第一和第二完成。

解决方法修改accession_taxonomy表并添加一个名为insertHash的字段。您的批处理更新过程对此字段将具有唯一值-对于此批处理执行。假设todaysDate--如果插入步骤失败,则可以执行

Delete T from accession_taxonomy T Where T.insertHash ='TheValueUSet'

从本质上讲,它变成了这样:

1st Attempt 1M rows : committed
2nd Attempt 1M rows : committed
3rd Attempt 1m rows : error
Delete AllRows where insertHash = 'TheValueUSet'

已经说过,您确定要拍摄100万行吗?您是否检查过您的服务器是否能够接受该大数据包?

© www.soinside.com 2019 - 2024. All rights reserved.