我在 docker 容器内有一个多线程 ETL 进程,看起来像这个简化的代码:
class Query(abc.ABC):
def __init__(self):
self.connection = sqlalchemy.create_engine(MYSQL_CONNECTION_STR)
def load(self, df: pd.DataFrame) -> None:
df.to_sql(
name=self.table, con=self.connection, if_exists="replace", index=False,
)
@abc.abstractmethod
def transform(self, data: object) -> pd.DataFrame:
pass
@abc.abstractmethod
def extract(self) -> object:
pass
# other methods...
class ComplianceInfluxQuery(Query):
# Implements abstract methods... load method is the same as Query class
ALL_QUERIES = [ComplianceInfluxQuery("cc_influx_share_count"),ComplianceInfluxQuery("cc_influx_we_count")....]
while True:
with ThreadPoolExecutor(max_workers=8) as pool:
for query in ALL_QUERIES:
pool.submit(execute_etl, query) # execute_etl function calls extract, transform and load
许多类继承自 Query,与
load()
具有相同的实现,如类 Query
所示,它只是将 pandas DataFrame 对象加载到 SQL 表中,并替换该表(如果存在)。
所有类同时运行,并在完成
Extract()
和Transform()
后将结果加载到MySQL数据库。
每个类都会向数据库加载不同的表。
当调用
load()
方法时,我经常会遇到来自随机线程的死锁:
2020-09-17 09:48:28,138 | INFO | root | query | 44 | 1 | ComplianceInfluxQuery.load()
2020-09-17 09:48:28,160 | INFO | root | query | 44 | 1 | ComplianceInfluxQuery.load()
2020-09-17 09:48:28,241 | ERROR | root | calculate | 124 | 1 | Failed to execute query ComplianceInfluxQuery
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
self.dialect.do_execute(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.8/site-packages/pymysql/cursors.py", line 163, in execute
result = self._query(query)
File "/usr/local/lib/python3.8/site-packages/pymysql/cursors.py", line 321, in _query
conn.query(q)
File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 505, in query
self._affected_rows = self._read_query_result(unbuffered=unbuffered)
File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 724, in _read_query_result
result.read()
File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 1069, in read
first_packet = self.connection._read_packet()
File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 676, in _read_packet
packet.raise_for_error()
File "/usr/local/lib/python3.8/site-packages/pymysql/protocol.py", line 223, in raise_for_error
err.raise_mysql_exception(self._data)
File "/usr/local/lib/python3.8/site-packages/pymysql/err.py", line 107, in raise_mysql_exception
raise errorclass(errno, errval)
pymysql.err.OperationalError: (1213, 'Deadlock found when trying to get lock; try restarting transaction')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "calculate.py", line 119, in execute_etl
query.run()
File "/chil_etl/query.py", line 45, in run
self.load(df)
File "/chil_etl/query.py", line 22, in load
df.to_sql(
File "/usr/local/lib/python3.8/site-packages/pandas/core/generic.py", line 2653, in to_sql
sql.to_sql(
File "/usr/local/lib/python3.8/site-packages/pandas/io/sql.py", line 512, in to_sql
pandas_sql.to_sql(
File "/usr/local/lib/python3.8/site-packages/pandas/io/sql.py", line 1316, in to_sql
table.create()
File "/usr/local/lib/python3.8/site-packages/pandas/io/sql.py", line 649, in create
self._execute_create()
File "/usr/local/lib/python3.8/site-packages/pandas/io/sql.py", line 641, in _execute_create
self.table.create()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/schema.py", line 927, in create
bind._run_visitor(ddl.SchemaGenerator, self, checkfirst=checkfirst)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2097, in _run_visitor
conn._run_visitor(visitorcallable, element, **kwargs)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1656, in _run_visitor
visitorcallable(self.dialect, self, **kwargs).traverse_single(element)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/visitors.py", line 145, in traverse_single
return meth(obj, **kw)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/ddl.py", line 827, in visit_table
self.connection.execute(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/ddl.py", line 72, in _execute_on_connection
return connection._execute_ddl(self, multiparams, params)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1068, in _execute_ddl
ret = self._execute_context(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
self._handle_dbapi_exception(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
util.raise_(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
self.dialect.do_execute(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.8/site-packages/pymysql/cursors.py", line 163, in execute
result = self._query(query)
File "/usr/local/lib/python3.8/site-packages/pymysql/cursors.py", line 321, in _query
conn.query(q)
File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 505, in query
self._affected_rows = self._read_query_result(unbuffered=unbuffered)
File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 724, in _read_query_result
result.read()
File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 1069, in read
first_packet = self.connection._read_packet()
File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 676, in _read_packet
packet.raise_for_error()
File "/usr/local/lib/python3.8/site-packages/pymysql/protocol.py", line 223, in raise_for_error
err.raise_mysql_exception(self._data)
File "/usr/local/lib/python3.8/site-packages/pymysql/err.py", line 107, in raise_mysql_exception
raise errorclass(errno, errval)
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
[SQL:
CREATE TABLE cc_influx_share_count (
unique_identifier TEXT,
nfs_share_count FLOAT(53),
smb_share_count FLOAT(53),
s3_bucket_count FLOAT(53)
)
]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
日志显示两个线程几乎同时调用了
load()
方法。
无论数据如何,这都可能发生在所有类别中。
我运行了命令
SHOW ENGINE INNODB STATUS
,那里没有列出死锁。
我检查了general_log表以更好地了解死锁期间发生的情况,但除了死锁的线程没有向表中插入任何值
cc_influx_share_count
当(我认为)它应该有的事实之外,没有注意到任何有用的东西:
SELECT * FROM mysql.general_log WHERE event_time >= "2020-09-17 09:48:27" AND event_time <= "2020-09-17 09:48:29" ORDER BY event_time ASC;
输出(敏感数据已删除):
Time: 2020-09-17 09:48:27.010747. Event: COMMIT
Time: 2020-09-17 09:48:27.011075. Event: ROLLBACK
Time: 2020-09-17 09:48:27.012042. Event: CREATE TABLE cc_influx_last_metric_time (
unique_identifier TEXT,
timestamp TIMESTAMP NULL,
uptime BIGINT
)
Time: 2020-09-17 09:48:27.033973. Event: COMMIT
Time: 2020-09-17 09:48:27.034327. Event: ROLLBACK
Time: 2020-09-17 09:48:27.053837. Event: INSERT INTO cc_influx_last_metric_time (unique_identifier, timestamp, uptime) VALUES (...)
Time: 2020-09-17 09:48:27.066930. Event: COMMIT
Time: 2020-09-17 09:48:27.068657. Event: ROLLBACK
Time: 2020-09-17 09:48:27.887579. Event: DESCRIBE `container_post_deployments`
Time: 2020-09-17 09:48:27.889705. Event: ROLLBACK
Time: 2020-09-17 09:48:27.890186. Event: DESCRIBE `container_post_deployments`
Time: 2020-09-17 09:48:27.892125. Event: ROLLBACK
Time: 2020-09-17 09:48:27.892619. Event: SHOW FULL TABLES FROM `chil_etl`
Time: 2020-09-17 09:48:27.894964. Event: SHOW CREATE TABLE `container_post_deployments`
Time: 2020-09-17 09:48:27.896491. Event: ROLLBACK
Time: 2020-09-17 09:48:27.897097. Event: DROP TABLE container_post_deployments
Time: 2020-09-17 09:48:27.907816. Event: COMMIT
Time: 2020-09-17 09:48:27.908322. Event: ROLLBACK
Time: 2020-09-17 09:48:27.909890. Event: CREATE TABLE container_post_deployments (
image TEXT,
`clientId` TEXT,
message TEXT,
timestamp TIMESTAMP NULL,
status_code BIGINT,
something TEXT,
user_agent TEXT
)
Time: 2020-09-17 09:48:27.928665. Event: COMMIT
Time: 2020-09-17 09:48:27.929089. Event: ROLLBACK
Time: 2020-09-17 09:48:27.932310. Event: INSERT INTO container_post_deployments (image, `clientId`, message, timestamp, status_code, something, user_agent) VALUES (...)
Time: 2020-09-17 09:48:27.934410. Event: COMMIT
Time: 2020-09-17 09:48:27.936774. Event: ROLLBACK
Time: 2020-09-17 09:48:28.140219. Event: DESCRIBE `cc_influx_share_count`
Time: 2020-09-17 09:48:28.163517. Event: DESCRIBE `cc_influx_we_count`
Time: 2020-09-17 09:48:28.166070. Event: ROLLBACK
Time: 2020-09-17 09:48:28.168159. Event: DESCRIBE `cc_influx_share_count`
Time: 2020-09-17 09:48:28.169895. Event: ROLLBACK
Time: 2020-09-17 09:48:28.170583. Event: DESCRIBE `cc_influx_we_count`
Time: 2020-09-17 09:48:28.174444. Event: ROLLBACK
Time: 2020-09-17 09:48:28.176339. Event: SHOW FULL TABLES FROM `chil_etl`
Time: 2020-09-17 09:48:28.177915. Event: ROLLBACK
Time: 2020-09-17 09:48:28.179331. Event: SHOW FULL TABLES FROM `chil_etl`
Time: 2020-09-17 09:48:28.182284. Event: SHOW CREATE TABLE `cc_influx_share_count`
Time: 2020-09-17 09:48:28.185154. Event: ROLLBACK
Time: 2020-09-17 09:48:28.192493. Event: SHOW CREATE TABLE `cc_influx_we_count`
Time: 2020-09-17 09:48:28.192887. Event: DROP TABLE cc_influx_share_count
Time: 2020-09-17 09:48:28.194530. Event: ROLLBACK
Time: 2020-09-17 09:48:28.195707. Event: DROP TABLE cc_influx_we_count
Time: 2020-09-17 09:48:28.207712. Event: COMMIT
Time: 2020-09-17 09:48:28.208141. Event: ROLLBACK
Time: 2020-09-17 09:48:28.210087. Event: CREATE TABLE cc_influx_share_count (
unique_identifier TEXT,
nfs_share_count FLOAT(53),
smb_share_count FLOAT(53),
s3_bucket_count FLOAT(53)
)
Time: 2020-09-17 09:48:28.215350. Event: COMMIT
Time: 2020-09-17 09:48:28.216115. Event: ROLLBACK
Time: 2020-09-17 09:48:28.217996. Event: CREATE TABLE cc_influx_we_count (
unique_identifier TEXT,
timestamp TIMESTAMP NULL,
`ANF` FLOAT(53),
`S3` FLOAT(53),
`CVO` FLOAT(53)
)
Time: 2020-09-17 09:48:28.240455. Event: ROLLBACK
Time: 2020-09-17 09:48:28.240908. Event: ROLLBACK
Time: 2020-09-17 09:48:28.244425. Event: COMMIT
Time: 2020-09-17 09:48:28.244965. Event: ROLLBACK
Time: 2020-09-17 09:48:28.249009. Event: INSERT INTO cc_influx_we_count (unique_identifier, timestamp, `ANF`, `S3`, `CVO`) VALUES (...)
Time: 2020-09-17 09:48:28.253638. Event: COMMIT
Time: 2020-09-17 09:48:28.256299. Event: ROLLBACK
Time: 2020-09-17 09:48:28.525814. Event: DESCRIBE `cc_influx_disk_usage`
Time: 2020-09-17 09:48:28.530211. Event: ROLLBACK
Time: 2020-09-17 09:48:28.532392. Event: DESCRIBE `cc_influx_disk_usage`
Time: 2020-09-17 09:48:28.539685. Event: ROLLBACK
Time: 2020-09-17 09:48:28.541868. Event: SHOW FULL TABLES FROM `chil_etl`
Time: 2020-09-17 09:48:28.560271. Event: SHOW CREATE TABLE `cc_influx_disk_usage`
Time: 2020-09-17 09:48:28.565451. Event: ROLLBACK
Time: 2020-09-17 09:48:28.569257. Event: DROP TABLE cc_influx_disk_usage
Time: 2020-09-17 09:48:28.585562. Event: COMMIT
Time: 2020-09-17 09:48:28.595193. Event: ROLLBACK
Time: 2020-09-17 09:48:28.598230. Event: CREATE TABLE cc_influx_disk_usage (
unique_identifier TEXT,
timestamp TIMESTAMP NULL,
total_gb FLOAT(53),
used_gb FLOAT(53)
)
Time: 2020-09-17 09:48:28.619580. Event: COMMIT
Time: 2020-09-17 09:48:28.620411. Event: ROLLBACK
Time: 2020-09-17 09:48:28.625385. Event: INSERT INTO cc_influx_disk_usage (unique_identifier, timestamp, total_gb, used_gb) VALUES (....)
Time: 2020-09-17 09:48:28.628706. Event: COMMIT
Time: 2020-09-17 09:48:28.631955. Event: ROLLBACK
Time: 2020-09-17 09:48:28.840143. Event: DESCRIBE `cc_influx_aws_subscription`
Time: 2020-09-17 09:48:28.844303. Event: ROLLBACK
Time: 2020-09-17 09:48:28.845637. Event: DESCRIBE `cc_influx_aws_subscription`
Time: 2020-09-17 09:48:28.848076. Event: ROLLBACK
Time: 2020-09-17 09:48:28.848646. Event: SHOW FULL TABLES FROM `chil_etl`
Time: 2020-09-17 09:48:28.851165. Event: SHOW CREATE TABLE `cc_influx_aws_subscription`
Time: 2020-09-17 09:48:28.852202. Event: ROLLBACK
Time: 2020-09-17 09:48:28.852691. Event: DROP TABLE cc_influx_aws_subscription
Time: 2020-09-17 09:48:28.861657. Event: COMMIT
Time: 2020-09-17 09:48:28.862099. Event: ROLLBACK
Time: 2020-09-17 09:48:28.863288. Event: CREATE TABLE cc_influx_aws_subscription (
unique_identifier TEXT,
timestamp TIMESTAMP NULL,
is_subscribed BIGINT
)
Time: 2020-09-17 09:48:28.878554. Event: COMMIT
Time: 2020-09-17 09:48:28.879113. Event: ROLLBACK
Time: 2020-09-17 09:48:28.881054. Event: INSERT INTO cc_influx_aws_subscription (unique_identifier, timestamp, is_subscribed) VALUES (....)
Time: 2020-09-17 09:48:28.882642. Event: COMMIT
Time: 2020-09-17 09:48:28.884614. Event: ROLLBACK
Time: 2020-09-17 09:48:28.918677. Event: DESCRIBE `hubspot_data`
Time: 2020-09-17 09:48:28.922938. Event: ROLLBACK
Time: 2020-09-17 09:48:28.923993. Event: DESCRIBE `hubspot_data`
Time: 2020-09-17 09:48:28.928181. Event: ROLLBACK
Time: 2020-09-17 09:48:28.928808. Event: SHOW FULL TABLES FROM `chil_etl`
Time: 2020-09-17 09:48:28.931225. Event: SHOW CREATE TABLE `hubspot_data`
Time: 2020-09-17 09:48:28.934269. Event: ROLLBACK
Time: 2020-09-17 09:48:28.934851. Event: DROP TABLE hubspot_data
Time: 2020-09-17 09:48:28.949309. Event: COMMIT
Time: 2020-09-17 09:48:28.949778. Event: ROLLBACK
Time: 2020-09-17 09:48:28.953829. Event: CREATE TABLE hubspot_data (...)
Time: 2020-09-17 09:48:28.973177. Event: COMMIT
Time: 2020-09-17 09:48:28.973652. Event: ROLLBACK
此 ETL 是唯一运行 MySQL 的进程。
我已阅读有关“为什么会发生死锁”的文档,但我无法理解两个之间没有连接的不同表如何导致死锁。
我知道我可以简单地再次运行 load()
方法直到成功,但我想了解为什么会发生死锁,以及如何防止死锁。
您的问题说您从多个线程执行插入。执行 INSERT 需要检查主键唯一性和外键有效性等约束,然后更新表示这些约束的索引。所以多个并发更新
必须锁定索引才能读取,然后
您可以通过在执行加载之前更改要填充的表以删除所有索引(自动增量主键除外),然后在之后重新创建它们来解决此问题。
或者,您可以摆脱并发性,只用一个线程执行
ETL的 L。由于所有索引维护,线程并不能像直观上那样帮助提高吞吐量。 避免在多个并发线程上运行数据定义语言(CREATE TABLE、CREATE INDEX 等)。对这些东西进行故障排除比其价值更麻烦。
此外,在事务中包装数百行左右的每个块的 INSERT 可以以惊人的方式帮助 ETL 吞吐量。在每个主干之前,说
BEGIN TRANSACTION;
在每个块之后说
COMMIT;
为什么这有帮助?因为 COMMIT 操作需要时间,并且不在显式事务中的每个操作后面都会有一个隐式 COMMIT。class Query(abc.ABC):
def __init__(self):
self.engine = MysqlEngine.engine()
....
....
def load(self, df: pd.DataFrame) -> None:
for i in range(5): # If load fails due to a deadlock, try 4 more times
try:
df.to_sql(
name=self.table,
con=self.engine.connect(),
if_exists="replace",
index=False,
)
return
except sqlalchemy.exc.OperationalError as ex:
if "1213" in repr(ex):
logging.warning(
"Failed to acquire lock for %s", self.__class__.__name__
)
sleep(1)
死锁仍然会发生,并且你会损失一些性能,但它比重新执行整个 Extrac - Transform 更好。