使用 SQLAlchemy 执行更新语句不会更新行,但语句在 MySQL Workbench 中执行时完美运行

问题描述 投票:0回答:1

我有以下代码:

from sqlalchemy import create_engine

SCHEMA = 'dev_gba'
TABLE = 'dev_l1c_v2'
USER = 'db-user'
PASSWORD = '-'
ENDPOINT = '-.us-east-1.rds.amazonaws.com'
process_start = 'SOME_VAL'
process_end = 'SOME_VAL'
granule_id = 'A006202_20160829T191558'

engine = create_engine(f"mysql+pymysql://{USER}:{PASSWORD}@{ENDPOINT}/{SCHEMA}")
connection = engine.raw_connection()
try:
    cursor_obj = connection.cursor()        
    cursor_obj.execute(f'UPDATE {SCHEMA}.{TABLE} SET PROCESS_START_TIME = "{process_start}", PROCESS_END_TIME = "{process_end}" WHERE {SCHEMA}.{TABLE}.GRANULE_ID  = "{granule_id}"')
    cursor_obj.close()
finally:
    connection.close()

如果我从数据库中选择全部,那么我可以看到未更新的行。但是,如果我像这样打印声明:

print(f'UPDATE {SCHEMA}.{TABLE} SET PROCESS_START_TIME = "{process_start}", PROCESS_END_TIME = "{process_end}" WHERE {SCHEMA}.{TABLE}.GRANULE_ID = "{granule_id}"')

输出为:

UPDATE dev_gba.dev_l1c_v2 SET PROCESS_START_TIME = "SOME_VAL", PROCESS_END_TIME = "SOME_VAL" WHERE dev_gba.dev_l1c_v2.GRANULE_ID = "A006202_20160829T191558"

如果我将其复制并粘贴到 MySQL 工作台中,它将执行,并且我可以看到行已更新。

我已在工作台中禁用安全更新,并尝试在执行语句之前将其添加为

cursor_obj.execute('SET SQL_SAFE_UPDATES = 0')

但这也行不通。这是另一件令人困惑的事情,在我的代码的前面我运行了以下内容:

connection = engine.raw_connection()
try:
    cursor_obj = connection.cursor()
    cursor_obj.execute(f'CREATE TEMPORARY TABLE {TEMP_TABLE} SELECT {TABLE}.index FROM {SCHEMA}.{TABLE} WHERE IN_PROGRESS = 0 AND PROCESSED = 0 ORDER BY RAND() LIMIT {CPU_COUNT}')
    cursor_obj.execute(f'UPDATE {SCHEMA}.{TABLE} SET IN_PROGRESS = 1, INSTANCE_ID = "{INSTANCE_ID}" WHERE {SCHEMA}.{TABLE}.index IN (SELECT {TEMP_TABLE}.index FROM {TEMP_TABLE})')
    cursor_obj.execute(f'SELECT BASE_URL FROM {SCHEMA}.{TABLE} WHERE {SCHEMA}.{TABLE}.index IN (SELECT {TEMP_TABLE}.index FROM {TEMP_TABLE})')
    result = cursor_obj.fetchall()
    cursor_obj.execute(f'DROP TABLE {TEMP_TABLE}')
    cursor_obj.close()
finally:
    connection.close()

此代码中的更新语句运行良好,没有任何问题。我还尝试将 echo=True 添加到我的创建引擎行中:

engine = create_engine(f"mysql+pymysql://{USER}:{PASSWORD}@{ENDPOINT}/{SCHEMA}", echo = True)

输出为:

2021-12-31 10:17:09,613 信息 sqlalchemy.engine.Engine 显示类似“sql_mode”的变量

2021-12-31 10:17:09,616 信息 sqlalchemy.engine.Engine [原始 sql] {}

2021-12-31 10:17:09,700 信息 sqlalchemy.engine.Engine 显示类似“lower_case_table_names”的变量

2021-12-31 10:17:09,701 信息 sqlalchemy.engine.Engine [在 0.00143 秒内生成] {}

2021-12-31 10:17:09,858 信息 sqlalchemy.engine.Engine 选择数据库()

2021-12-31 10:17:09,859 信息 sqlalchemy.engine.Engine [原始 sql] {}

这不是很有用。

我也尝试过:

from sqlalchemy.sql import text
cursor_obj.execute(text(f'UPDATE {SCHEMA}.{TABLE} SET PROCESS_START_TIME = "{process_start}", PROCESS_END_TIME = "{process_end}" WHERE {SCHEMA}.{TABLE}.GRANULE_ID  = "{granule_id}"'))

这会出现以下错误:

TypeError:“TextClause”类型的对象没有 len()

不太确定从这里该去哪里。

python mysql sqlalchemy pymysql
1个回答
2
投票

在 Python 中使用字符串格式创建 SQL 语句很容易出错,如果有更好的工具可用,应该避免。

您可以像这样使用 SQLAlchemy core 运行原始查询,而不必下拉到原始连接:

import sqlalchemy as sa

engine = create_engine(f"mysql+pymysql://{USER}:{PASSWORD}@{ENDPOINT}/{SCHEMA}")


# Reflect the database table into an object  
tbl = sa.Table(TABLE, sa.MetaData(), autoload_with=engine)
# Create an update object
upd = sa.update(tbl).where(tbl.c.GRANULE_ID == granule_id).values(PROCESS_START_TIME=process_start_time, PROCESS_END_TIME=process_end_time)

# The "begin" context manager will automatically commit on exit
with engine.begin() as conn:
    conn.execute(upd)

如果您需要使用原始 SQL,您可以这样做(请参阅使用文本 SQL):

# We need to use string formatting to set the table; SQLAlchemy will automatically qualify it with the schema name.
stmt = f'UPDATE {TABLE} SET PROCESS_START_TIME = :process_start_time, PROCESS_END_TIME = :process_end_time WHERE {TABLE}.GRANULE_ID  = :granule_id'

values = {
    'process_start_time': process_start_time,
    'process_end_time': process_end_time,
    'granule_id': granule_id,
}

with engine.begin() as conn:
    conn.execute(sa.text(stmt), values)
© www.soinside.com 2019 - 2024. All rights reserved.