多线程环境下使用DataFrame.to_sql时MySQL死锁

问题描述 投票:0回答:3

我在 docker 容器内有一个多线程 ETL 进程,看起来像这个简化的代码:

class Query(abc.ABC):
    def __init__(self):
        self.connection = sqlalchemy.create_engine(MYSQL_CONNECTION_STR)

    def load(self, df: pd.DataFrame) -> None:
        df.to_sql(
            name=self.table, con=self.connection, if_exists="replace", index=False,
        )

    @abc.abstractmethod
    def transform(self, data: object) -> pd.DataFrame:
        pass

    @abc.abstractmethod
    def extract(self) -> object:
        pass

    #  other methods...


class ComplianceInfluxQuery(Query):
    # Implements abstract methods... load method is the same as Query class


ALL_QUERIES = [ComplianceInfluxQuery("cc_influx_share_count"),ComplianceInfluxQuery("cc_influx_we_count")....]


while True:
    with ThreadPoolExecutor(max_workers=8) as pool:
        for query in ALL_QUERIES:
            pool.submit(execute_etl, query) # execute_etl function calls extract, transform and load

许多类继承自 Query,与

load()
具有相同的实现,如类
Query
所示,它只是将 pandas DataFrame 对象加载到 SQL 表中,并替换该表(如果存在)。

所有类同时运行,并在完成

Extract()
Transform()
后将结果加载到MySQL数据库。 每个类都会向数据库加载不同的表。

当调用

load()
方法时,我经常会遇到来自随机线程的死锁:

2020-09-17 09:48:28,138 | INFO | root | query | 44 | 1 | ComplianceInfluxQuery.load()
2020-09-17 09:48:28,160 | INFO | root | query | 44 | 1 | ComplianceInfluxQuery.load()
2020-09-17 09:48:28,241 | ERROR | root | calculate | 124 | 1 | Failed to execute query ComplianceInfluxQuery
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.8/site-packages/pymysql/cursors.py", line 163, in execute
    result = self._query(query)
  File "/usr/local/lib/python3.8/site-packages/pymysql/cursors.py", line 321, in _query
    conn.query(q)
  File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 505, in query
    self._affected_rows = self._read_query_result(unbuffered=unbuffered)
  File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 724, in _read_query_result
    result.read()
  File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 1069, in read
    first_packet = self.connection._read_packet()
  File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 676, in _read_packet
    packet.raise_for_error()
  File "/usr/local/lib/python3.8/site-packages/pymysql/protocol.py", line 223, in raise_for_error
    err.raise_mysql_exception(self._data)
  File "/usr/local/lib/python3.8/site-packages/pymysql/err.py", line 107, in raise_mysql_exception
    raise errorclass(errno, errval)
pymysql.err.OperationalError: (1213, 'Deadlock found when trying to get lock; try restarting transaction')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "calculate.py", line 119, in execute_etl
    query.run()
  File "/chil_etl/query.py", line 45, in run
    self.load(df)
  File "/chil_etl/query.py", line 22, in load
    df.to_sql(
  File "/usr/local/lib/python3.8/site-packages/pandas/core/generic.py", line 2653, in to_sql
    sql.to_sql(
  File "/usr/local/lib/python3.8/site-packages/pandas/io/sql.py", line 512, in to_sql
    pandas_sql.to_sql(
  File "/usr/local/lib/python3.8/site-packages/pandas/io/sql.py", line 1316, in to_sql
    table.create()
  File "/usr/local/lib/python3.8/site-packages/pandas/io/sql.py", line 649, in create
    self._execute_create()
  File "/usr/local/lib/python3.8/site-packages/pandas/io/sql.py", line 641, in _execute_create
    self.table.create()
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/schema.py", line 927, in create
    bind._run_visitor(ddl.SchemaGenerator, self, checkfirst=checkfirst)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2097, in _run_visitor
    conn._run_visitor(visitorcallable, element, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1656, in _run_visitor
    visitorcallable(self.dialect, self, **kwargs).traverse_single(element)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/visitors.py", line 145, in traverse_single
    return meth(obj, **kw)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/ddl.py", line 827, in visit_table
    self.connection.execute(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/ddl.py", line 72, in _execute_on_connection
    return connection._execute_ddl(self, multiparams, params)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1068, in _execute_ddl
    ret = self._execute_context(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.8/site-packages/pymysql/cursors.py", line 163, in execute
    result = self._query(query)
  File "/usr/local/lib/python3.8/site-packages/pymysql/cursors.py", line 321, in _query
    conn.query(q)
  File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 505, in query
    self._affected_rows = self._read_query_result(unbuffered=unbuffered)
  File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 724, in _read_query_result
    result.read()
  File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 1069, in read
    first_packet = self.connection._read_packet()
  File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 676, in _read_packet
    packet.raise_for_error()
  File "/usr/local/lib/python3.8/site-packages/pymysql/protocol.py", line 223, in raise_for_error
    err.raise_mysql_exception(self._data)
  File "/usr/local/lib/python3.8/site-packages/pymysql/err.py", line 107, in raise_mysql_exception
    raise errorclass(errno, errval)
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
[SQL: 
CREATE TABLE cc_influx_share_count (
    unique_identifier TEXT, 
    nfs_share_count FLOAT(53), 
    smb_share_count FLOAT(53), 
    s3_bucket_count FLOAT(53)
)

]
(Background on this error at: http://sqlalche.me/e/13/e3q8)

日志显示两个线程几乎同时调用了

load()
方法。 无论数据如何,这都可能发生在所有类别中。

我运行了命令

SHOW ENGINE INNODB STATUS
,那里没有列出死锁。

我检查了general_log表以更好地了解死锁期间发生的情况,但除了死锁的线程没有向表中插入任何值

cc_influx_share_count
当(我认为)它应该有的事实之外,没有注意到任何有用的东西:

  • 错误于 09:48:28,241 提出

SELECT * FROM mysql.general_log WHERE event_time >= "2020-09-17 09:48:27" AND event_time <= "2020-09-17 09:48:29" ORDER BY event_time ASC;

输出(敏感数据已删除):

Time: 2020-09-17 09:48:27.010747. Event: COMMIT


Time: 2020-09-17 09:48:27.011075. Event: ROLLBACK


Time: 2020-09-17 09:48:27.012042. Event: CREATE TABLE cc_influx_last_metric_time (
    unique_identifier TEXT, 
    timestamp TIMESTAMP NULL, 
    uptime BIGINT
)


Time: 2020-09-17 09:48:27.033973. Event: COMMIT


Time: 2020-09-17 09:48:27.034327. Event: ROLLBACK


Time: 2020-09-17 09:48:27.053837. Event: INSERT INTO cc_influx_last_metric_time (unique_identifier, timestamp, uptime) VALUES (...)


Time: 2020-09-17 09:48:27.066930. Event: COMMIT


Time: 2020-09-17 09:48:27.068657. Event: ROLLBACK


Time: 2020-09-17 09:48:27.887579. Event: DESCRIBE `container_post_deployments`


Time: 2020-09-17 09:48:27.889705. Event: ROLLBACK


Time: 2020-09-17 09:48:27.890186. Event: DESCRIBE `container_post_deployments`


Time: 2020-09-17 09:48:27.892125. Event: ROLLBACK


Time: 2020-09-17 09:48:27.892619. Event: SHOW FULL TABLES FROM `chil_etl`


Time: 2020-09-17 09:48:27.894964. Event: SHOW CREATE TABLE `container_post_deployments`


Time: 2020-09-17 09:48:27.896491. Event: ROLLBACK


Time: 2020-09-17 09:48:27.897097. Event: DROP TABLE container_post_deployments


Time: 2020-09-17 09:48:27.907816. Event: COMMIT


Time: 2020-09-17 09:48:27.908322. Event: ROLLBACK


Time: 2020-09-17 09:48:27.909890. Event: CREATE TABLE container_post_deployments (
    image TEXT, 
    `clientId` TEXT, 
    message TEXT, 
    timestamp TIMESTAMP NULL, 
    status_code BIGINT, 
    something TEXT, 
    user_agent TEXT
)


Time: 2020-09-17 09:48:27.928665. Event: COMMIT


Time: 2020-09-17 09:48:27.929089. Event: ROLLBACK


Time: 2020-09-17 09:48:27.932310. Event: INSERT INTO container_post_deployments (image, `clientId`, message, timestamp, status_code, something, user_agent) VALUES (...)


Time: 2020-09-17 09:48:27.934410. Event: COMMIT


Time: 2020-09-17 09:48:27.936774. Event: ROLLBACK


Time: 2020-09-17 09:48:28.140219. Event: DESCRIBE `cc_influx_share_count`


Time: 2020-09-17 09:48:28.163517. Event: DESCRIBE `cc_influx_we_count`


Time: 2020-09-17 09:48:28.166070. Event: ROLLBACK


Time: 2020-09-17 09:48:28.168159. Event: DESCRIBE `cc_influx_share_count`


Time: 2020-09-17 09:48:28.169895. Event: ROLLBACK


Time: 2020-09-17 09:48:28.170583. Event: DESCRIBE `cc_influx_we_count`


Time: 2020-09-17 09:48:28.174444. Event: ROLLBACK


Time: 2020-09-17 09:48:28.176339. Event: SHOW FULL TABLES FROM `chil_etl`


Time: 2020-09-17 09:48:28.177915. Event: ROLLBACK


Time: 2020-09-17 09:48:28.179331. Event: SHOW FULL TABLES FROM `chil_etl`


Time: 2020-09-17 09:48:28.182284. Event: SHOW CREATE TABLE `cc_influx_share_count`


Time: 2020-09-17 09:48:28.185154. Event: ROLLBACK


Time: 2020-09-17 09:48:28.192493. Event: SHOW CREATE TABLE `cc_influx_we_count`


Time: 2020-09-17 09:48:28.192887. Event: DROP TABLE cc_influx_share_count


Time: 2020-09-17 09:48:28.194530. Event: ROLLBACK


Time: 2020-09-17 09:48:28.195707. Event: DROP TABLE cc_influx_we_count


Time: 2020-09-17 09:48:28.207712. Event: COMMIT


Time: 2020-09-17 09:48:28.208141. Event: ROLLBACK


Time: 2020-09-17 09:48:28.210087. Event: CREATE TABLE cc_influx_share_count (
    unique_identifier TEXT, 
    nfs_share_count FLOAT(53), 
    smb_share_count FLOAT(53), 
    s3_bucket_count FLOAT(53)
)


Time: 2020-09-17 09:48:28.215350. Event: COMMIT


Time: 2020-09-17 09:48:28.216115. Event: ROLLBACK


Time: 2020-09-17 09:48:28.217996. Event: CREATE TABLE cc_influx_we_count (
    unique_identifier TEXT, 
    timestamp TIMESTAMP NULL, 
    `ANF` FLOAT(53), 
    `S3` FLOAT(53), 
    `CVO` FLOAT(53)
)


Time: 2020-09-17 09:48:28.240455. Event: ROLLBACK


Time: 2020-09-17 09:48:28.240908. Event: ROLLBACK


Time: 2020-09-17 09:48:28.244425. Event: COMMIT


Time: 2020-09-17 09:48:28.244965. Event: ROLLBACK


Time: 2020-09-17 09:48:28.249009. Event: INSERT INTO cc_influx_we_count (unique_identifier, timestamp, `ANF`, `S3`, `CVO`) VALUES (...)


Time: 2020-09-17 09:48:28.253638. Event: COMMIT


Time: 2020-09-17 09:48:28.256299. Event: ROLLBACK


Time: 2020-09-17 09:48:28.525814. Event: DESCRIBE `cc_influx_disk_usage`


Time: 2020-09-17 09:48:28.530211. Event: ROLLBACK


Time: 2020-09-17 09:48:28.532392. Event: DESCRIBE `cc_influx_disk_usage`


Time: 2020-09-17 09:48:28.539685. Event: ROLLBACK


Time: 2020-09-17 09:48:28.541868. Event: SHOW FULL TABLES FROM `chil_etl`


Time: 2020-09-17 09:48:28.560271. Event: SHOW CREATE TABLE `cc_influx_disk_usage`


Time: 2020-09-17 09:48:28.565451. Event: ROLLBACK


Time: 2020-09-17 09:48:28.569257. Event: DROP TABLE cc_influx_disk_usage


Time: 2020-09-17 09:48:28.585562. Event: COMMIT


Time: 2020-09-17 09:48:28.595193. Event: ROLLBACK


Time: 2020-09-17 09:48:28.598230. Event: CREATE TABLE cc_influx_disk_usage (
    unique_identifier TEXT, 
    timestamp TIMESTAMP NULL, 
    total_gb FLOAT(53), 
    used_gb FLOAT(53)
)


Time: 2020-09-17 09:48:28.619580. Event: COMMIT


Time: 2020-09-17 09:48:28.620411. Event: ROLLBACK


Time: 2020-09-17 09:48:28.625385. Event: INSERT INTO cc_influx_disk_usage (unique_identifier, timestamp, total_gb, used_gb) VALUES (....)


Time: 2020-09-17 09:48:28.628706. Event: COMMIT


Time: 2020-09-17 09:48:28.631955. Event: ROLLBACK


Time: 2020-09-17 09:48:28.840143. Event: DESCRIBE `cc_influx_aws_subscription`


Time: 2020-09-17 09:48:28.844303. Event: ROLLBACK


Time: 2020-09-17 09:48:28.845637. Event: DESCRIBE `cc_influx_aws_subscription`


Time: 2020-09-17 09:48:28.848076. Event: ROLLBACK


Time: 2020-09-17 09:48:28.848646. Event: SHOW FULL TABLES FROM `chil_etl`


Time: 2020-09-17 09:48:28.851165. Event: SHOW CREATE TABLE `cc_influx_aws_subscription`


Time: 2020-09-17 09:48:28.852202. Event: ROLLBACK


Time: 2020-09-17 09:48:28.852691. Event: DROP TABLE cc_influx_aws_subscription


Time: 2020-09-17 09:48:28.861657. Event: COMMIT


Time: 2020-09-17 09:48:28.862099. Event: ROLLBACK


Time: 2020-09-17 09:48:28.863288. Event: CREATE TABLE cc_influx_aws_subscription (
    unique_identifier TEXT, 
    timestamp TIMESTAMP NULL, 
    is_subscribed BIGINT
)


Time: 2020-09-17 09:48:28.878554. Event: COMMIT


Time: 2020-09-17 09:48:28.879113. Event: ROLLBACK


Time: 2020-09-17 09:48:28.881054. Event: INSERT INTO cc_influx_aws_subscription (unique_identifier, timestamp, is_subscribed) VALUES (....)


Time: 2020-09-17 09:48:28.882642. Event: COMMIT


Time: 2020-09-17 09:48:28.884614. Event: ROLLBACK


Time: 2020-09-17 09:48:28.918677. Event: DESCRIBE `hubspot_data`


Time: 2020-09-17 09:48:28.922938. Event: ROLLBACK


Time: 2020-09-17 09:48:28.923993. Event: DESCRIBE `hubspot_data`


Time: 2020-09-17 09:48:28.928181. Event: ROLLBACK


Time: 2020-09-17 09:48:28.928808. Event: SHOW FULL TABLES FROM `chil_etl`


Time: 2020-09-17 09:48:28.931225. Event: SHOW CREATE TABLE `hubspot_data`


Time: 2020-09-17 09:48:28.934269. Event: ROLLBACK


Time: 2020-09-17 09:48:28.934851. Event: DROP TABLE hubspot_data


Time: 2020-09-17 09:48:28.949309. Event: COMMIT


Time: 2020-09-17 09:48:28.949778. Event: ROLLBACK


Time: 2020-09-17 09:48:28.953829. Event: CREATE TABLE hubspot_data (...)


Time: 2020-09-17 09:48:28.973177. Event: COMMIT


Time: 2020-09-17 09:48:28.973652. Event: ROLLBACK

此 ETL 是唯一运行 MySQL 的进程。 我已阅读有关“为什么会发生死锁”的文档,但我无法理解两个之间没有连接的不同表如何导致死锁。 我知道我可以简单地再次运行 load() 方法直到成功,但我想了解为什么会发生死锁,以及如何防止死锁。

MySQL版本是8.0.21。
蟒蛇3.8.4。
sqlalchemy 1.3.19。
熊猫 1.0.5。
PyMySQL 0.10.1.

python mysql multithreading sqlalchemy deadlock
3个回答
1
投票

您的问题说您从多个线程执行插入。执行 INSERT 需要检查主键唯一性和外键有效性等约束,然后更新表示这些约束的索引。所以多个并发更新

必须锁定索引才能读取,然后
  1. 锁定它们以便写入。
  2. 从你的问题来看,MySQL有时会陷入死锁情况(一个线程按a,b顺序锁定索引,另一个线程按b,a顺序锁定索引)。如果不同的线程可以同时向不同的表插入行,并且这些表通过外键约束相互关联,那么索引维护相对容易陷入死锁情况。

您可以通过在执行加载之前更改要填充的表以删除所有索引(自动增量主键除外),然后在之后重新创建它们来解决此问题。

或者,您可以摆脱并发性,只用一个线程执行

ETL

L。由于所有索引维护,线程并不能像直观上那样帮助提高吞吐量。 避免在多个并发线程上运行数据定义语言(CREATE TABLE、CREATE INDEX 等)。对这些东西进行故障排除比其价值更麻烦。

此外,在事务中包装数百行左右的每个块的 INSERT 可以以惊人的方式帮助 ETL 吞吐量。在每个主干之前,说

BEGIN TRANSACTION;

在每个块之后说

COMMIT;
为什么这有帮助?因为 COMMIT 操作需要时间,并且不在显式事务中的每个操作后面都会有一个隐式 COMMIT。
    


0
投票

class Query(abc.ABC): def __init__(self): self.engine = MysqlEngine.engine() .... .... def load(self, df: pd.DataFrame) -> None: for i in range(5): # If load fails due to a deadlock, try 4 more times try: df.to_sql( name=self.table, con=self.engine.connect(), if_exists="replace", index=False, ) return except sqlalchemy.exc.OperationalError as ex: if "1213" in repr(ex): logging.warning( "Failed to acquire lock for %s", self.__class__.__name__ ) sleep(1)

死锁仍然会发生,并且你会损失一些性能,但它比重新执行整个 Extrac - Transform 更好。


0
投票
threading.Lock()

对象,以便一次有一个线程插入或更新数据库。 例如: def load(self, df: pd.DataFrame) -> None: with self.lock: df.to_sql(name=self.table, con=self.connection, if_exists="replace", index=False) pass

应确保第一个线程
acquire()

self.lock(我们之前添加到类属性中)将完成to_sql()函数,然后释放其他线程的锁以acquire()。在 Lock 对象的上下文中编写它可以确保锁定 release()

© www.soinside.com 2019 - 2024. All rights reserved.