我有一个相当大的数据集,大约6,000,000行X 60列,我试图插入到数据库中。我正在对它们进行分块,并使用我编写的类和pymysql将它们一次插入到mysql数据库中。问题是,我偶尔会在写入时超时服务器,因此我修改了executemany调用以重新连接错误。当我失去连接一次时这很好用,但如果我第二次丢失错误,我会得到一个pymysql.InternalException,说明超过了锁等待超时。我想知道如何修改以下代码以捕获它并在再次尝试之前完全销毁该事务。
我已经尝试在连接上调用rollback(),但如果连接被销毁,则会导致另一个InternalException,因为不再有游标。
任何帮助将不胜感激(我也不明白为什么我要开始超时,但数据相对较大。)
class Database:
def __init__(self, **creds):
self.conn = None
self.user = creds['user']
self.password = creds['password']
self.host = creds['host']
self.port = creds['port']
self.database = creds['database']
def connect(self, type=None):
self.conn = pymysql.connect(
host = self.host,
user = self.user,
password = self.password,
port = self.port,
database = self.database
)
def executemany(self, sql, data):
while True:
try:
with self.conn.cursor() as cursor:
cursor.executemany(sql, data)
self.conn.commit()
break
except pymysql.err.OperationalError:
print('Connection error. Reconnecting to database.')
time.sleep(2)
self.connect()
continue
return cursor
而我这样称呼它:
for index, chunk in enumerate(dataframe_chunker(df), start=1):
print(f"Writing chunk\t{index}\t{timer():.2f}")
db.executemany(insert_query, chunk.values.tolist())
看看MySQL正在做什么。 lockwait超时是因为在其他内容完成之前无法完成插入,这可能是您自己的代码。
SELECT * FROM `information_schema`.`innodb_locks`;
将显示当前锁定。
select * from information_schema.innodb_trx where trx_id = [lock_trx_id];
将显示涉及的交易
SELECT * FROM INFORMATION_SCHEMA.PROCESSLIST where id = [trx_mysql_thread_id];
将显示所涉及的连接,并可能显示其锁定导致锁定等待超时的查询。也许有一个未提交的交易。
它可能是您自己的代码,因为与executemany
函数的交互会捕获异常并重新连接到数据库。以前的连接是什么? lockwait超时是否会终止先前的连接?虽然这是真的会有麻烦。
对于在db连接上调用executemany
的代码,在try / except上更具防御性,例如:
def executemany(self, sql, data):
while True:
try:
with self.conn.cursor() as cursor:
cursor.executemany(sql, data)
self.conn.commit()
break
except pymysql.err.OperationalError:
print('Connection error. Reconnecting to database.')
if self.conn.is_connected():
connection.close()
finally:
time.sleep(2)
self.connect()
但是,如果没有其他数据库客户端,这里的解决方案是不会导致lockwait超时。