我正在从 Amazon Redshift 获取数据。 更准确地说,我正在查询表
stl_query_text
以提取在特定日期运行的查询列表。
这是使用 SqlAlchemy 获取数据的 Python 脚本。
from sqlalchemy import create_engine
_QUERY = """
SELECT
q.query AS query_id,
qt.text
FROM
pg_catalog.stl_query AS q
INNER JOIN
pg_catalog.stl_querytext AS qt
ON q.query = qt.query
WHERE TRUE
AND DATE(q.starttime) = '2023-10-24'
AND EXTRACT('hour' FROM q.starttime) = 19
"""
uri = "postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}".format(**_secrets)
engine = create_engine(uri)
with engine.connect() as connection:
result = connection.execute(_QUERY)
for row in result:
print(row)
直到最近它都运行良好。它失败并出现以下错误:
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc2 in position 656: invalid continuation byte
显然,某些查询的文本中有奇怪的字符序列。如果我在没有
qt.text
的情况下运行相同的查询,它工作得很好。但我需要文字。
我尝试了很多方法,但找不到任何解决方法:
如果有效的话将是最好的选择。我已尝试按照 here
的描述在 URI 中指定字符集client_encoding=utf8
=> 我收到同样的错误,因为它是默认编码client_encoding=latin1
=> 致命:参数“client_encoding”的值无效:“latin1”
ISO-8859-1
这将是部分获取,但我对此很满意。我真的不需要错误的行。 但是,我无法让它发挥作用。
使用
try ... except
或 throw instead of rise
,如此处所述
它部分工作:我可以获取所有行,直到游标遇到错误行。之后,生成器将停止,如线程中所述:
然而,这样做的缺点是您仍然必须在生成器中放置通用的异常处理,这可能会产生错误。这是不可能解决的,因为在生成器中引发任何异常都会将其关闭。
问题是生成器在SqlAlchemy内部,我无法修改它。
我还尝试忽略如上所述的编码错误这里
engine = create_engine(uri,encoding_errors='忽略')
但显然这个参数不被接受:
TypeError:使用配置 PGDialect_psycopg2/QueuePool/Engine,发送到 create_engine() 的参数“encoding_errors”无效。请检查关键字参数是否适合此组件组合。
我尝试调整查询以排除有错误的行。
例如:
SELECT
q.query AS query_id,
qt.text
FROM
pg_catalog.stl_query AS q
INNER JOIN
pg_catalog.stl_querytext AS qt
ON q.query = qt.query
WHERE TRUE
AND DATE(q.starttime) = '2023-10-24'
AND EXTRACT('hour' FROM q.starttime) = 19
AND qt.text NOT LIKE '%0xc2%'
它不起作用,但这并不奇怪,因为我正在 VARCHAR 中搜索字节序列。
我尝试用 redshift-connector 替换 SqlAlchemy 但出现相同的解码错误。
任何想法将不胜感激!非常感谢
指定编码:
不要更改
client_encoding
,而是直接在连接字符串中指定编码,如下所示:
engine = create_engine(uri, encoding='ISO-8859-1')
使用原始连接绕过 SQLAlchemy 解码:
使用原始数据库连接将绕过 SQLAlchemy 的解码过程。您仍然可以使用
psycopg2
来实现此目的。
代码:
import psycopg2
conn = psycopg2.connect("dbname='your_db' user='your_user' host='your_host' password='your_password'")
cur = conn.cursor()
cur.execute(_QUERY)
rows = cur.fetchall()
for row in rows:
try:
print(row[1].decode('utf-8'))
except UnicodeDecodeError:
print(row[1].decode('ISO-8859-1'))
如果这些方法不起作用,可以尝试删除所有非 UTF-8 支持的字符作为长期解决方案。